You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by gj...@apache.org on 2021/09/08 19:19:48 UTC

[hbase] branch branch-1 updated: HBASE-26195 Abort RS if wal sync fails or times out

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new d309276  HBASE-26195 Abort RS if wal sync fails or times out
d309276 is described below

commit d309276121fe00aa30c9b26bd0ac09214f4133d6
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Sep 8 15:15:56 2021 -0400

    HBASE-26195 Abort RS if wal sync fails or times out
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  95 +++---
 .../hbase/client/TestRollbackFromClient.java       | 361 ---------------------
 .../regionserver/TestFailedAppendAndSync.java      |   1 +
 .../hadoop/hbase/regionserver/TestHRegion.java     |  60 +---
 .../regionserver/wal/TestFSHLogTimedOutSync.java   | 184 +++++++++++
 5 files changed, 234 insertions(+), 467 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b74731f..3e752cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3339,7 +3339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     WALEdit walEdit = null;
     MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
     long txid = 0;
-    boolean doRollBackMemstore = false;
+    boolean walSyncSuccess = true;
     boolean locked = false;
     int cellCount = 0;
     /** Keep track of the locks we hold so we can release them in finally clause */
@@ -3705,7 +3705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (updateSeqId) {
           updateSequenceId(familyMaps[i].values(), mvccNum);
         }
-        doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
         addedSize += applyFamilyMapToMemstore(familyMaps[i]);
       }
 
@@ -3721,11 +3720,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // -------------------------
       // STEP 7. Sync wal.
       // -------------------------
+      walSyncSuccess = false;
       if (txid != 0) {
         syncOrDefer(txid, durability);
       }
+      walSyncSuccess = true;
 
-      doRollBackMemstore = false;
       // update memstore size
       this.addAndGetGlobalMemstoreSize(addedSize);
 
@@ -3776,14 +3776,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       success = true;
       return addedSize;
+    } catch (Throwable t) {
+      // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
+      // and any replicated clusters.
+      if (!walSyncSuccess) {
+        rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
+      }
+      // Rethrow the exception.
+      throw t;
     } finally {
-      // if the wal sync was unsuccessful, remove keys from memstore
-      if (doRollBackMemstore) {
-        for (int j = 0; j < familyMaps.length; j++) {
-          for(List<Cell> cells:familyMaps[j].values()) {
-            rollbackMemstore(cells);
-          }
-        }
+      // if the wal sync was unsuccessful, complete the mvcc
+      if (!walSyncSuccess) {
         if (writeEntry != null) mvcc.complete(writeEntry);
       } else {
         if (writeEntry != null) {
@@ -4255,33 +4258,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       Store store = getStore(family);
       size += store.add(cells);
     }
-
      return size;
    }
 
-  /**
-   * Remove all the keys listed in the map from the memstore. This method is
-   * called when a Put/Delete has updated memstore but subsequently fails to update
-   * the wal. This method is then invoked to rollback the memstore.
-   */
-  private void rollbackMemstore(List<Cell> memstoreCells) {
-    rollbackMemstore(null, memstoreCells);
-  }
-
-  private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) {
-    int kvsRolledback = 0;
-    for (Cell cell : memstoreCells) {
-      Store store = defaultStore;
-      if (store == null) {
-        byte[] family = CellUtil.cloneFamily(cell);
-        store = getStore(family);
-      }
-      store.rollback(cell);
-      kvsRolledback++;
-    }
-    LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
-  }
-
   @Override
   public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
     for (byte[] family : families) {
@@ -7964,7 +7943,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.writeRequestsCount.increment();
     RowLock rowLock = null;
     WALKey walKey = null;
-    boolean doRollBackMemstore = false;
+    boolean walSyncSuccess = true;
     try {
       rowLock = getRowLockInternal(row);
       assert rowLock != null;
@@ -8092,7 +8071,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           // Actually write to Memstore now
-          doRollBackMemstore = !tempMemstore.isEmpty();
           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
             Store store = entry.getKey();
             if (store.getFamily().getMaxVersions() == 1) {
@@ -8120,27 +8098,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         rowLock = null;
       }
       // sync the transaction log outside the rowlock
+      walSyncSuccess = false;
       if(txid != 0){
         syncOrDefer(txid, durability);
       }
+      walSyncSuccess = true;
       if (rsServices != null && rsServices.getMetrics() != null) {
         rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
           getTableName());
       }
-      doRollBackMemstore = false;
+    } catch (Throwable t) {
+      // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
+      // and any replicated clusters.
+      if (!walSyncSuccess) {
+        rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
+      }
+      // Rethrow the exception.
+      throw t;
     } finally {
       if (rowLock != null) {
         rowLock.release();
       }
       // if the wal sync was unsuccessful, remove keys from memstore
       WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
-      if (doRollBackMemstore) {
-        for (Map.Entry<Store, List<Cell>> entry: tempMemstore.entrySet()) {
-          rollbackMemstore(entry.getKey(), entry.getValue());
-        }
-        for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
-          entry.getKey().add(entry.getValue());
-        }
+      if (!walSyncSuccess) {
         if (we != null) {
           mvcc.complete(we);
         }
@@ -8270,7 +8251,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
     RowLock rowLock = null;
     WALKey walKey = null;
-    boolean doRollBackMemstore = false;
+    boolean walSyncSuccess = true;
     long accumulatedResultSize = 0;
     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
     Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
@@ -8351,7 +8332,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           // Now write to memstore, a family at a time.
-          doRollBackMemstore = !forMemStore.isEmpty();
           for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
             Store store = entry.getKey();
             List<Cell> results = entry.getValue();
@@ -8375,24 +8355,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         rowLock.release();
         rowLock = null;
       }
+      walSyncSuccess = false;
       // sync the transaction log outside the rowlock
       if(txid != 0) {
         syncOrDefer(txid, effectiveDurability);
       }
-      doRollBackMemstore = false;
+      walSyncSuccess = true;
+    } catch (Throwable t) {
+      // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
+      // and any replicated clusters.
+      if (!walSyncSuccess) {
+        rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
+      }
+      // Rethrow the exception.
+      throw t;
     } finally {
       if (rowLock != null) {
         rowLock.release();
       }
       // if the wal sync was unsuccessful, remove keys from memstore
       WriteEntry we = walKey != null ? walKey.getWriteEntry() : null;
-      if (doRollBackMemstore) {
-        for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
-          rollbackMemstore(entry.getKey(), entry.getValue());
-        }
-        for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
-          entry.getKey().add(entry.getValue());
-        }
+      if (!walSyncSuccess) {
         if (we != null) {
           mvcc.complete(we);
         }
@@ -9342,4 +9325,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public RegionSplitPolicy getSplitPolicy() {
     return this.splitPolicy;
   }
+
+  public void setRegionServerServices(RegionServerServices services) {
+    this.rsServices = services;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
deleted file mode 100644
index f91adf4..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category(SmallTests.class)
-public class TestRollbackFromClient {
-  @Rule
-  public TestName name = new TestName();
-  private final static HBaseTestingUtility TEST_UTIL
-    = new HBaseTestingUtility();
-  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
-  private static final int SLAVES = 3;
-  private static final byte[] ROW = Bytes.toBytes("testRow");
-  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
-  private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2");
-  private static final byte[] VALUE = Bytes.toBytes("testValue");
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
-    TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName());
-    TEST_UTIL.startMiniCluster(SLAVES);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testAppendRollback() throws IOException {
-    Updater updateForEmptyTable = new Updater() {
-      @Override
-      public int updateData(Table table, byte[] family) {
-        try {
-          Append append = new Append(ROW);
-          append.add(FAMILY, QUALIFIER, VALUE);
-          append.add(FAMILY, QUALIFIER_V2, VALUE);
-          FailedHLog.SHOULD_FAIL.set(true);
-          table.append(append);
-        } catch (IOException e) {
-          // It should fail because the WAL fail also
-        } finally {
-          FailedHLog.SHOULD_FAIL.set(false);
-        }
-        return 0;
-      }
-    };
-    testRollback(updateForEmptyTable, 1, null);
-    testRollback(updateForEmptyTable, 2, null);
-
-    final Append preAppend = new Append(ROW);
-    preAppend.add(FAMILY, QUALIFIER, VALUE);
-    Cell initCell = preAppend.getCellList(FAMILY).get(0);
-    Updater updateForNonEmptyTable = new Updater() {
-      @Override
-      public int updateData(Table table, byte[] family) throws IOException {
-        table.append(preAppend);
-        try {
-          Append append = new Append(ROW);
-          append.add(FAMILY, QUALIFIER, VALUE);
-          append.add(FAMILY, QUALIFIER_V2, VALUE);
-          FailedHLog.SHOULD_FAIL.set(true);
-          table.append(append);
-          Assert.fail("It should fail because the WAL sync is failed");
-        } catch (IOException e) {
-        } finally {
-          FailedHLog.SHOULD_FAIL.set(false);
-        }
-        return 1;
-      }
-    };
-    testRollback(updateForNonEmptyTable, 1, initCell);
-    testRollback(updateForNonEmptyTable, 2, initCell);
-  }
-
-  @Test
-  public void testIncrementRollback() throws IOException {
-    Updater updateForEmptyTable = new Updater() {
-      @Override
-      public int updateData(Table table, byte[] family) {
-        try {
-          Increment inc = new Increment(ROW);
-          inc.addColumn(FAMILY, QUALIFIER, 1);
-          inc.addColumn(FAMILY, QUALIFIER_V2, 2);
-          FailedHLog.SHOULD_FAIL.set(true);
-          table.increment(inc);
-        } catch (IOException e) {
-          // It should fail because the WAL fail also
-        } finally {
-          FailedHLog.SHOULD_FAIL.set(false);
-        }
-        return 0;
-      }
-    };
-    testRollback(updateForEmptyTable, 1, null);
-    testRollback(updateForEmptyTable, 2, null);
-
-    final Increment preIncrement = new Increment(ROW);
-    preIncrement.addColumn(FAMILY, QUALIFIER, 1);
-    Cell initCell = preIncrement.getCellList(FAMILY).get(0);
-    Updater updateForNonEmptyTable = new Updater() {
-      @Override
-      public int updateData(Table table, byte[] family) throws IOException {
-        table.increment(preIncrement);
-        try {
-          Increment inc = new Increment(ROW);
-          inc.addColumn(FAMILY, QUALIFIER, 1);
-          inc.addColumn(FAMILY, QUALIFIER_V2, 2);
-          FailedHLog.SHOULD_FAIL.set(true);
-          table.increment(inc);
-          Assert.fail("It should fail because the WAL sync is failed");
-        } catch (IOException e) {
-        } finally {
-          FailedHLog.SHOULD_FAIL.set(false);
-        }
-        return 1;
-      }
-    };
-    testRollback(updateForNonEmptyTable, 1, initCell);
-    testRollback(updateForNonEmptyTable, 2, initCell);
-  }
-
-  @Test
-  public void testPutRollback() throws IOException {
-    Updater updateForEmptyTable = new Updater() {
-      @Override
-      public int updateData(Table table, byte[] family) {
-        try {
-          Put put = new Put(ROW);
-          put.addColumn(FAMILY, QUALIFIER, VALUE);
-          FailedHLog.SHOULD_FAIL.set(true);
-          table.put(put);
-          Assert.fail("It should fail because the WAL sync is failed");
-        } catch (IOException e) {
-        } finally {
-          FailedHLog.SHOULD_FAIL.set(false);
-        }
-        return 0;
-      }
-    };
-    testRollback(updateForEmptyTable, 1, null);
-    testRollback(updateForEmptyTable, 2, null);
-
-    final Put prePut = new Put(ROW);
-    prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa"));
-    Cell preCell = prePut.getCellList(FAMILY).get(0);
-    Updater updateForNonEmptyTable = new Updater() {
-      @Override
-      public int updateData(Table table, byte[] family) throws IOException {
-        table.put(prePut);
-        try {
-          Put put = new Put(ROW);
-          put.addColumn(FAMILY, QUALIFIER, VALUE);
-          FailedHLog.SHOULD_FAIL.set(true);
-          table.put(put);
-          Assert.fail("It should fail because the WAL sync is failed");
-        } catch (IOException e) {
-        } finally {
-          FailedHLog.SHOULD_FAIL.set(false);
-        }
-        return 1;
-      }
-    };
-    testRollback(updateForNonEmptyTable, 1, preCell);
-    testRollback(updateForNonEmptyTable, 2, preCell);
-  }
-
-  private void testRollback(Updater updater, int versions, Cell initCell) throws IOException {
-    TableName tableName = TableName.valueOf(this.name.getMethodName());
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    HColumnDescriptor col = new HColumnDescriptor(FAMILY);
-    col.setMaxVersions(versions);
-    desc.addFamily(col);
-    TEST_UTIL.getHBaseAdmin().createTable(desc);
-    int expected;
-    List<Cell> cells;
-    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
-        Table table = conn.getTable(tableName)) {
-      expected = updater.updateData(table, FAMILY);
-      cells = getAllCells(table);
-    }
-    TEST_UTIL.getHBaseAdmin().disableTable(tableName);
-    TEST_UTIL.getHBaseAdmin().deleteTable(tableName);
-    assertEquals(expected, cells.size());
-    if (initCell != null && cells.isEmpty()) {
-      Cell cell = cells.get(0);
-      assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell));
-      assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell));
-      assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell));
-      assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell));
-    }
-  }
-
-  interface Updater {
-    int updateData(Table table, byte[] family) throws IOException;
-  }
-
-  private static List<Cell> getAllCells(Table table) throws IOException {
-    List<Cell> cells = new ArrayList<>();
-    try (ResultScanner scanner = table.getScanner(new Scan())) {
-      for (Result r : scanner) {
-        cells.addAll(r.listCells());
-      }
-      return cells;
-    }
-  }
-
-  public static class FailedDefaultWALProvider extends DefaultWALProvider {
-    @Override
-    public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException {
-      WAL wal = super.getWAL(identifier, namespace);
-      return new FailedHLog(wal);
-    }
-  }
-
-  public static class FailedHLog implements WAL {
-    private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false);
-    private final WAL delegation;
-    FailedHLog(final WAL delegation) {
-      this.delegation = delegation;
-    }
-    @Override
-    public void registerWALActionsListener(WALActionsListener listener) {
-      delegation.registerWALActionsListener(listener);
-    }
-
-    @Override
-    public boolean unregisterWALActionsListener(WALActionsListener listener) {
-      return delegation.unregisterWALActionsListener(listener);
-    }
-
-    @Override
-    public byte[][] rollWriter() throws FailedLogCloseException, IOException {
-      return delegation.rollWriter();
-    }
-
-    @Override
-    public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
-      return delegation.rollWriter(force);
-    }
-
-    @Override
-    public void shutdown() throws IOException {
-      delegation.shutdown();
-    }
-
-    @Override
-    public void close() throws IOException {
-      delegation.close();
-    }
-
-    @Override
-    public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException {
-      return delegation.append(htd, info, key, edits, inMemstore);
-    }
-
-    @Override
-    public void sync() throws IOException {
-      delegation.sync();
-    }
-
-    @Override
-    public void sync(long txid) throws IOException {
-      sync(txid, false);
-    }
-
-    @Override
-    public void sync(boolean forceSync) throws IOException {
-      delegation.sync(forceSync);
-    }
-
-    @Override
-    public void sync(long txid, boolean forceSync) throws IOException {
-      if (SHOULD_FAIL.get()) {
-        throw new IOException("[TESTING] we need the failure!!!");
-      }
-      delegation.sync(txid, forceSync);
-    }
-
-    @Override
-    public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
-      return delegation.startCacheFlush(encodedRegionName, families);
-    }
-
-    @Override
-    public void completeCacheFlush(byte[] encodedRegionName) {
-      delegation.completeCacheFlush(encodedRegionName);
-    }
-
-    @Override
-    public void abortCacheFlush(byte[] encodedRegionName) {
-      delegation.abortCacheFlush(encodedRegionName);
-    }
-
-    @Override
-    public WALCoprocessorHost getCoprocessorHost() {
-      return delegation.getCoprocessorHost();
-    }
-
-    @Override
-    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
-      return delegation.getEarliestMemstoreSeqNum(encodedRegionName);
-    }
-
-    @Override
-    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
-      return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 863d514..19691de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -177,6 +177,7 @@ public class TestFailedAppendAndSync {
     boolean threwOnBoth = false;
 
     HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
+    region.setRegionServerServices(services);
     try {
       // Get some random bytes.
       byte[] value = Bytes.toBytes(getName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 338a07d..8dab647 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -38,6 +38,8 @@ import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -162,7 +164,6 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.FaultyFSLog;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -178,6 +179,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -311,57 +313,6 @@ public class TestHRegion {
     region = null;
   }
 
-  /*
-   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
-   * See HBASE-10845
-   */
-  @Test (timeout=60000)
-  public void testMemstoreSnapshotSize() throws IOException {
-    class MyFaultyFSLog extends FaultyFSLog {
-      StoreFlushContext storeFlushCtx;
-      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
-          throws IOException {
-        super(fs, rootDir, logName, conf);
-      }
-
-      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
-        this.storeFlushCtx = storeFlushCtx;
-      }
-
-      @Override
-      public void sync(long txid, boolean forceSync) throws IOException {
-        storeFlushCtx.prepare();
-        super.sync(txid, forceSync);
-      }
-    }
-
-    FileSystem fs = FileSystem.get(CONF);
-    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
-    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
-    region = initHRegion(tableName, null, null, name.getMethodName(),
-        CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
-
-    Store store = region.getStore(COLUMN_FAMILY_BYTES);
-    // Get some random bytes.
-    byte [] value = Bytes.toBytes(name.getMethodName());
-    faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
-
-    Put put = new Put(value);
-    put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
-    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
-
-    boolean threwIOE = false;
-    try {
-      region.put(put);
-    } catch (IOException ioe) {
-      threwIOE = true;
-    } finally {
-      assertTrue("The regionserver should have thrown an exception", threwIOE);
-    }
-    long sz = store.getFlushableSize();
-    assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
-  }
-
   /**
    * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
    */
@@ -551,6 +502,11 @@ public class TestHRegion {
         try {
           // Initialize region
           region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
+          RegionServerServices services = mock(RegionServerServices.class);
+          doNothing().when(services).abort(anyString(), Matchers.<Throwable>any());
+          doReturn(ServerName.valueOf("fake-server", 0, 0L)). when(services).getServerName();
+          region.setRegionServerServices(services);
+
           long size = region.getMemstoreSize();
           Assert.assertEquals(0, size);
           // Put one item into memstore.  Measure the size of one item in memstore.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java
new file mode 100644
index 0000000..5554a68
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/*
+  Testing RS abort in case if sync fails/times out.
+ */
+@Category({MediumTests.class, RegionServerTests.class})
+public class TestFSHLogTimedOutSync {
+  private static final Log LOG = LogFactory.getLog(TestFSHLogTimedOutSync.class);
+
+  @Rule public TestName name = new TestName();
+
+  private static final String COLUMN_FAMILY = "MyCF";
+  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
+  private static final String COLUMN_QUALIFIER = "MyCQ";
+  private static final byte [] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Configuration CONF ;
+  private String dir;
+
+  // Test names
+  protected TableName tableName;
+
+  @Before
+  public void setup() throws IOException {
+    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
+    CONF = TEST_UTIL.getConfiguration();
+    // Disable block cache.
+    CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
+    tableName = TableName.valueOf(name.getMethodName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  // Test that RS aborts in case of put, append and increment when sync fails or times out.
+  @Test(timeout=30000)
+  public void testRSAbortWhenSyncTimedOut() throws IOException {
+    // Dodgy WAL. Will throw exceptions when flags set.
+    class DodgyFSLog extends FSHLog {
+      volatile boolean throwSyncException = false;
+
+      public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
+        throws IOException {
+        super(fs, root, logDir, conf);
+      }
+
+      @Override
+      public void sync(long txid) throws IOException {
+        super.sync(txid);
+        if (throwSyncException) {
+          throw new TimeoutIOException("Exception");
+        }
+      }
+
+      @Override
+      public void sync(long txid, boolean force) throws IOException {
+        super.sync(txid, force);
+        if (throwSyncException) {
+          throw new TimeoutIOException("Exception");
+        }
+      }
+    }
+
+    // Make up mocked server and services.
+    RegionServerServices services = mock(RegionServerServices.class);
+    FileSystem fs = FileSystem.get(CONF);
+    Path rootDir = new Path(dir + getName());
+    DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
+    HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
+    region.setRegionServerServices(services);
+    // Get some random bytes.
+    byte[] row = Bytes.toBytes(getName());
+    byte[] value = Bytes.toBytes(getName());
+    // Test Put operation
+    try {
+      dodgyWAL.throwSyncException = true;
+      Put put = new Put(row);
+      put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value);
+      region.put(put);
+      fail();
+    } catch (IOException ioe) {
+      assertTrue(ioe instanceof TimeoutIOException);
+    }
+    // Verify that RS aborts
+    Mockito.verify(services, Mockito.times(1)).
+      abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
+
+    // Test Append operation
+    try {
+      dodgyWAL.throwSyncException = true;
+      Append a = new Append(row);
+      a.setReturnResults(false);
+      a.add(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value);
+      region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
+      fail();
+    } catch (IOException ioe) {
+      assertTrue(ioe instanceof TimeoutIOException);
+    }
+    // Verify that RS aborts
+    Mockito.verify(services, Mockito.times(2)).
+      abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
+
+    // Test Increment operation
+    try {
+      dodgyWAL.throwSyncException = true;
+      final Increment inc = new Increment(row);
+      inc.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("qual2"), 1);
+      region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+      fail();
+    } catch (IOException ioe) {
+      assertTrue(ioe instanceof TimeoutIOException);
+    }
+    // Verify that RS aborts
+    Mockito.verify(services, Mockito.times(3)).
+      abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
+  }
+
+  String getName() {
+    return name.getMethodName();
+  }
+
+  /**
+   * @return A region on which you must call
+   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
+   */
+  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+    Configuration conf, WAL wal) throws IOException {
+    return TEST_UTIL.createLocalHRegion(tableName.getName(), startKey, stopKey,
+      getName(), conf, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
+  }
+}