You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by gj...@apache.org on 2021/09/08 19:19:48 UTC
[hbase] branch branch-1 updated: HBASE-26195 Abort RS if wal sync
fails or times out
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new d309276 HBASE-26195 Abort RS if wal sync fails or times out
d309276 is described below
commit d309276121fe00aa30c9b26bd0ac09214f4133d6
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Sep 8 15:15:56 2021 -0400
HBASE-26195 Abort RS if wal sync fails or times out
---
.../apache/hadoop/hbase/regionserver/HRegion.java | 95 +++---
.../hbase/client/TestRollbackFromClient.java | 361 ---------------------
.../regionserver/TestFailedAppendAndSync.java | 1 +
.../hadoop/hbase/regionserver/TestHRegion.java | 60 +---
.../regionserver/wal/TestFSHLogTimedOutSync.java | 184 +++++++++++
5 files changed, 234 insertions(+), 467 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b74731f..3e752cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3339,7 +3339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALEdit walEdit = null;
MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
long txid = 0;
- boolean doRollBackMemstore = false;
+ boolean walSyncSuccess = true;
boolean locked = false;
int cellCount = 0;
/** Keep track of the locks we hold so we can release them in finally clause */
@@ -3705,7 +3705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (updateSeqId) {
updateSequenceId(familyMaps[i].values(), mvccNum);
}
- doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
addedSize += applyFamilyMapToMemstore(familyMaps[i]);
}
@@ -3721,11 +3720,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// -------------------------
// STEP 7. Sync wal.
// -------------------------
+ walSyncSuccess = false;
if (txid != 0) {
syncOrDefer(txid, durability);
}
+ walSyncSuccess = true;
- doRollBackMemstore = false;
// update memstore size
this.addAndGetGlobalMemstoreSize(addedSize);
@@ -3776,14 +3776,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
success = true;
return addedSize;
+ } catch (Throwable t) {
+ // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
+ // and any replicated clusters.
+ if (!walSyncSuccess) {
+ rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
+ }
+ // Rethrow the exception.
+ throw t;
} finally {
- // if the wal sync was unsuccessful, remove keys from memstore
- if (doRollBackMemstore) {
- for (int j = 0; j < familyMaps.length; j++) {
- for(List<Cell> cells:familyMaps[j].values()) {
- rollbackMemstore(cells);
- }
- }
+ // if the wal sync was unsuccessful, complete the mvcc
+ if (!walSyncSuccess) {
if (writeEntry != null) mvcc.complete(writeEntry);
} else {
if (writeEntry != null) {
@@ -4255,33 +4258,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Store store = getStore(family);
size += store.add(cells);
}
-
return size;
}
- /**
- * Remove all the keys listed in the map from the memstore. This method is
- * called when a Put/Delete has updated memstore but subsequently fails to update
- * the wal. This method is then invoked to rollback the memstore.
- */
- private void rollbackMemstore(List<Cell> memstoreCells) {
- rollbackMemstore(null, memstoreCells);
- }
-
- private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) {
- int kvsRolledback = 0;
- for (Cell cell : memstoreCells) {
- Store store = defaultStore;
- if (store == null) {
- byte[] family = CellUtil.cloneFamily(cell);
- store = getStore(family);
- }
- store.rollback(cell);
- kvsRolledback++;
- }
- LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
- }
-
@Override
public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
for (byte[] family : families) {
@@ -7964,7 +7943,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.writeRequestsCount.increment();
RowLock rowLock = null;
WALKey walKey = null;
- boolean doRollBackMemstore = false;
+ boolean walSyncSuccess = true;
try {
rowLock = getRowLockInternal(row);
assert rowLock != null;
@@ -8092,7 +8071,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Actually write to Memstore now
- doRollBackMemstore = !tempMemstore.isEmpty();
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
if (store.getFamily().getMaxVersions() == 1) {
@@ -8120,27 +8098,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock = null;
}
// sync the transaction log outside the rowlock
+ walSyncSuccess = false;
if(txid != 0){
syncOrDefer(txid, durability);
}
+ walSyncSuccess = true;
if (rsServices != null && rsServices.getMetrics() != null) {
rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
getTableName());
}
- doRollBackMemstore = false;
+ } catch (Throwable t) {
+ // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
+ // and any replicated clusters.
+ if (!walSyncSuccess) {
+ rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
+ }
+ // Rethrow the exception.
+ throw t;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
- if (doRollBackMemstore) {
- for (Map.Entry<Store, List<Cell>> entry: tempMemstore.entrySet()) {
- rollbackMemstore(entry.getKey(), entry.getValue());
- }
- for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
- entry.getKey().add(entry.getValue());
- }
+ if (!walSyncSuccess) {
if (we != null) {
mvcc.complete(we);
}
@@ -8270,7 +8251,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
RowLock rowLock = null;
WALKey walKey = null;
- boolean doRollBackMemstore = false;
+ boolean walSyncSuccess = true;
long accumulatedResultSize = 0;
List<Cell> allKVs = new ArrayList<Cell>(increment.size());
Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
@@ -8351,7 +8332,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Now write to memstore, a family at a time.
- doRollBackMemstore = !forMemStore.isEmpty();
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
Store store = entry.getKey();
List<Cell> results = entry.getValue();
@@ -8375,24 +8355,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock.release();
rowLock = null;
}
+ walSyncSuccess = false;
// sync the transaction log outside the rowlock
if(txid != 0) {
syncOrDefer(txid, effectiveDurability);
}
- doRollBackMemstore = false;
+ walSyncSuccess = true;
+ } catch (Throwable t) {
+ // WAL sync failed. Aborting to avoid a mismatch between the memstore, WAL,
+ // and any replicated clusters.
+ if (!walSyncSuccess) {
+ rsServices.abort("WAL sync failed, aborting to preserve WAL as source of truth", t);
+ }
+ // Rethrow the exception.
+ throw t;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
WriteEntry we = walKey != null ? walKey.getWriteEntry() : null;
- if (doRollBackMemstore) {
- for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
- rollbackMemstore(entry.getKey(), entry.getValue());
- }
- for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
- entry.getKey().add(entry.getValue());
- }
+ if (!walSyncSuccess) {
if (we != null) {
mvcc.complete(we);
}
@@ -9342,4 +9325,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public RegionSplitPolicy getSplitPolicy() {
return this.splitPolicy;
}
+
+ public void setRegionServerServices(RegionServerServices services) {
+ this.rsServices = services;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
deleted file mode 100644
index f91adf4..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category(SmallTests.class)
-public class TestRollbackFromClient {
- @Rule
- public TestName name = new TestName();
- private final static HBaseTestingUtility TEST_UTIL
- = new HBaseTestingUtility();
- private static final byte[] FAMILY = Bytes.toBytes("testFamily");
- private static final int SLAVES = 3;
- private static final byte[] ROW = Bytes.toBytes("testRow");
- private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
- private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2");
- private static final byte[] VALUE = Bytes.toBytes("testValue");
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName());
- TEST_UTIL.startMiniCluster(SLAVES);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Test
- public void testAppendRollback() throws IOException {
- Updater updateForEmptyTable = new Updater() {
- @Override
- public int updateData(Table table, byte[] family) {
- try {
- Append append = new Append(ROW);
- append.add(FAMILY, QUALIFIER, VALUE);
- append.add(FAMILY, QUALIFIER_V2, VALUE);
- FailedHLog.SHOULD_FAIL.set(true);
- table.append(append);
- } catch (IOException e) {
- // It should fail because the WAL fail also
- } finally {
- FailedHLog.SHOULD_FAIL.set(false);
- }
- return 0;
- }
- };
- testRollback(updateForEmptyTable, 1, null);
- testRollback(updateForEmptyTable, 2, null);
-
- final Append preAppend = new Append(ROW);
- preAppend.add(FAMILY, QUALIFIER, VALUE);
- Cell initCell = preAppend.getCellList(FAMILY).get(0);
- Updater updateForNonEmptyTable = new Updater() {
- @Override
- public int updateData(Table table, byte[] family) throws IOException {
- table.append(preAppend);
- try {
- Append append = new Append(ROW);
- append.add(FAMILY, QUALIFIER, VALUE);
- append.add(FAMILY, QUALIFIER_V2, VALUE);
- FailedHLog.SHOULD_FAIL.set(true);
- table.append(append);
- Assert.fail("It should fail because the WAL sync is failed");
- } catch (IOException e) {
- } finally {
- FailedHLog.SHOULD_FAIL.set(false);
- }
- return 1;
- }
- };
- testRollback(updateForNonEmptyTable, 1, initCell);
- testRollback(updateForNonEmptyTable, 2, initCell);
- }
-
- @Test
- public void testIncrementRollback() throws IOException {
- Updater updateForEmptyTable = new Updater() {
- @Override
- public int updateData(Table table, byte[] family) {
- try {
- Increment inc = new Increment(ROW);
- inc.addColumn(FAMILY, QUALIFIER, 1);
- inc.addColumn(FAMILY, QUALIFIER_V2, 2);
- FailedHLog.SHOULD_FAIL.set(true);
- table.increment(inc);
- } catch (IOException e) {
- // It should fail because the WAL fail also
- } finally {
- FailedHLog.SHOULD_FAIL.set(false);
- }
- return 0;
- }
- };
- testRollback(updateForEmptyTable, 1, null);
- testRollback(updateForEmptyTable, 2, null);
-
- final Increment preIncrement = new Increment(ROW);
- preIncrement.addColumn(FAMILY, QUALIFIER, 1);
- Cell initCell = preIncrement.getCellList(FAMILY).get(0);
- Updater updateForNonEmptyTable = new Updater() {
- @Override
- public int updateData(Table table, byte[] family) throws IOException {
- table.increment(preIncrement);
- try {
- Increment inc = new Increment(ROW);
- inc.addColumn(FAMILY, QUALIFIER, 1);
- inc.addColumn(FAMILY, QUALIFIER_V2, 2);
- FailedHLog.SHOULD_FAIL.set(true);
- table.increment(inc);
- Assert.fail("It should fail because the WAL sync is failed");
- } catch (IOException e) {
- } finally {
- FailedHLog.SHOULD_FAIL.set(false);
- }
- return 1;
- }
- };
- testRollback(updateForNonEmptyTable, 1, initCell);
- testRollback(updateForNonEmptyTable, 2, initCell);
- }
-
- @Test
- public void testPutRollback() throws IOException {
- Updater updateForEmptyTable = new Updater() {
- @Override
- public int updateData(Table table, byte[] family) {
- try {
- Put put = new Put(ROW);
- put.addColumn(FAMILY, QUALIFIER, VALUE);
- FailedHLog.SHOULD_FAIL.set(true);
- table.put(put);
- Assert.fail("It should fail because the WAL sync is failed");
- } catch (IOException e) {
- } finally {
- FailedHLog.SHOULD_FAIL.set(false);
- }
- return 0;
- }
- };
- testRollback(updateForEmptyTable, 1, null);
- testRollback(updateForEmptyTable, 2, null);
-
- final Put prePut = new Put(ROW);
- prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa"));
- Cell preCell = prePut.getCellList(FAMILY).get(0);
- Updater updateForNonEmptyTable = new Updater() {
- @Override
- public int updateData(Table table, byte[] family) throws IOException {
- table.put(prePut);
- try {
- Put put = new Put(ROW);
- put.addColumn(FAMILY, QUALIFIER, VALUE);
- FailedHLog.SHOULD_FAIL.set(true);
- table.put(put);
- Assert.fail("It should fail because the WAL sync is failed");
- } catch (IOException e) {
- } finally {
- FailedHLog.SHOULD_FAIL.set(false);
- }
- return 1;
- }
- };
- testRollback(updateForNonEmptyTable, 1, preCell);
- testRollback(updateForNonEmptyTable, 2, preCell);
- }
-
- private void testRollback(Updater updater, int versions, Cell initCell) throws IOException {
- TableName tableName = TableName.valueOf(this.name.getMethodName());
- HTableDescriptor desc = new HTableDescriptor(tableName);
- HColumnDescriptor col = new HColumnDescriptor(FAMILY);
- col.setMaxVersions(versions);
- desc.addFamily(col);
- TEST_UTIL.getHBaseAdmin().createTable(desc);
- int expected;
- List<Cell> cells;
- try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
- Table table = conn.getTable(tableName)) {
- expected = updater.updateData(table, FAMILY);
- cells = getAllCells(table);
- }
- TEST_UTIL.getHBaseAdmin().disableTable(tableName);
- TEST_UTIL.getHBaseAdmin().deleteTable(tableName);
- assertEquals(expected, cells.size());
- if (initCell != null && cells.isEmpty()) {
- Cell cell = cells.get(0);
- assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell));
- assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell));
- assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell));
- assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell));
- }
- }
-
- interface Updater {
- int updateData(Table table, byte[] family) throws IOException;
- }
-
- private static List<Cell> getAllCells(Table table) throws IOException {
- List<Cell> cells = new ArrayList<>();
- try (ResultScanner scanner = table.getScanner(new Scan())) {
- for (Result r : scanner) {
- cells.addAll(r.listCells());
- }
- return cells;
- }
- }
-
- public static class FailedDefaultWALProvider extends DefaultWALProvider {
- @Override
- public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException {
- WAL wal = super.getWAL(identifier, namespace);
- return new FailedHLog(wal);
- }
- }
-
- public static class FailedHLog implements WAL {
- private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false);
- private final WAL delegation;
- FailedHLog(final WAL delegation) {
- this.delegation = delegation;
- }
- @Override
- public void registerWALActionsListener(WALActionsListener listener) {
- delegation.registerWALActionsListener(listener);
- }
-
- @Override
- public boolean unregisterWALActionsListener(WALActionsListener listener) {
- return delegation.unregisterWALActionsListener(listener);
- }
-
- @Override
- public byte[][] rollWriter() throws FailedLogCloseException, IOException {
- return delegation.rollWriter();
- }
-
- @Override
- public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
- return delegation.rollWriter(force);
- }
-
- @Override
- public void shutdown() throws IOException {
- delegation.shutdown();
- }
-
- @Override
- public void close() throws IOException {
- delegation.close();
- }
-
- @Override
- public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException {
- return delegation.append(htd, info, key, edits, inMemstore);
- }
-
- @Override
- public void sync() throws IOException {
- delegation.sync();
- }
-
- @Override
- public void sync(long txid) throws IOException {
- sync(txid, false);
- }
-
- @Override
- public void sync(boolean forceSync) throws IOException {
- delegation.sync(forceSync);
- }
-
- @Override
- public void sync(long txid, boolean forceSync) throws IOException {
- if (SHOULD_FAIL.get()) {
- throw new IOException("[TESTING] we need the failure!!!");
- }
- delegation.sync(txid, forceSync);
- }
-
- @Override
- public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
- return delegation.startCacheFlush(encodedRegionName, families);
- }
-
- @Override
- public void completeCacheFlush(byte[] encodedRegionName) {
- delegation.completeCacheFlush(encodedRegionName);
- }
-
- @Override
- public void abortCacheFlush(byte[] encodedRegionName) {
- delegation.abortCacheFlush(encodedRegionName);
- }
-
- @Override
- public WALCoprocessorHost getCoprocessorHost() {
- return delegation.getCoprocessorHost();
- }
-
- @Override
- public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
- return delegation.getEarliestMemstoreSeqNum(encodedRegionName);
- }
-
- @Override
- public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
- return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 863d514..19691de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -177,6 +177,7 @@ public class TestFailedAppendAndSync {
boolean threwOnBoth = false;
HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
+ region.setRegionServerServices(services);
try {
// Get some random bytes.
byte[] value = Bytes.toBytes(getName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 338a07d..8dab647 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -38,6 +38,8 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -162,7 +164,6 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.FaultyFSLog;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -178,6 +179,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -311,57 +313,6 @@ public class TestHRegion {
region = null;
}
- /*
- * This test is for verifying memstore snapshot size is correctly updated in case of rollback
- * See HBASE-10845
- */
- @Test (timeout=60000)
- public void testMemstoreSnapshotSize() throws IOException {
- class MyFaultyFSLog extends FaultyFSLog {
- StoreFlushContext storeFlushCtx;
- public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
- throws IOException {
- super(fs, rootDir, logName, conf);
- }
-
- void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
- this.storeFlushCtx = storeFlushCtx;
- }
-
- @Override
- public void sync(long txid, boolean forceSync) throws IOException {
- storeFlushCtx.prepare();
- super.sync(txid, forceSync);
- }
- }
-
- FileSystem fs = FileSystem.get(CONF);
- Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
- MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
- region = initHRegion(tableName, null, null, name.getMethodName(),
- CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
-
- Store store = region.getStore(COLUMN_FAMILY_BYTES);
- // Get some random bytes.
- byte [] value = Bytes.toBytes(name.getMethodName());
- faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
-
- Put put = new Put(value);
- put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
- faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
-
- boolean threwIOE = false;
- try {
- region.put(put);
- } catch (IOException ioe) {
- threwIOE = true;
- } finally {
- assertTrue("The regionserver should have thrown an exception", threwIOE);
- }
- long sz = store.getFlushableSize();
- assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
- }
-
/**
* Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
*/
@@ -551,6 +502,11 @@ public class TestHRegion {
try {
// Initialize region
region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
+ RegionServerServices services = mock(RegionServerServices.class);
+ doNothing().when(services).abort(anyString(), Matchers.<Throwable>any());
+ doReturn(ServerName.valueOf("fake-server", 0, 0L)). when(services).getServerName();
+ region.setRegionServerServices(services);
+
long size = region.getMemstoreSize();
Assert.assertEquals(0, size);
// Put one item into memstore. Measure the size of one item in memstore.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java
new file mode 100644
index 0000000..5554a68
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogTimedOutSync.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/*
+ Testing RS abort in case if sync fails/times out.
+ */
+@Category({MediumTests.class, RegionServerTests.class})
+public class TestFSHLogTimedOutSync {
+ private static final Log LOG = LogFactory.getLog(TestFSHLogTimedOutSync.class);
+
+ @Rule public TestName name = new TestName();
+
+ private static final String COLUMN_FAMILY = "MyCF";
+ private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
+ private static final String COLUMN_QUALIFIER = "MyCQ";
+ private static final byte [] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
+ private static HBaseTestingUtility TEST_UTIL;
+ private static Configuration CONF ;
+ private String dir;
+
+ // Test names
+ protected TableName tableName;
+
+ @Before
+ public void setup() throws IOException {
+ TEST_UTIL = HBaseTestingUtility.createLocalHTU();
+ CONF = TEST_UTIL.getConfiguration();
+ // Disable block cache.
+ CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
+ tableName = TableName.valueOf(name.getMethodName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ // Test that RS aborts in case of put, append and increment when sync fails or times out.
+ @Test(timeout=30000)
+ public void testRSAbortWhenSyncTimedOut() throws IOException {
+ // Dodgy WAL. Will throw exceptions when flags set.
+ class DodgyFSLog extends FSHLog {
+ volatile boolean throwSyncException = false;
+
+ public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
+ throws IOException {
+ super(fs, root, logDir, conf);
+ }
+
+ @Override
+ public void sync(long txid) throws IOException {
+ super.sync(txid);
+ if (throwSyncException) {
+ throw new TimeoutIOException("Exception");
+ }
+ }
+
+ @Override
+ public void sync(long txid, boolean force) throws IOException {
+ super.sync(txid, force);
+ if (throwSyncException) {
+ throw new TimeoutIOException("Exception");
+ }
+ }
+ }
+
+ // Make up mocked server and services.
+ RegionServerServices services = mock(RegionServerServices.class);
+ FileSystem fs = FileSystem.get(CONF);
+ Path rootDir = new Path(dir + getName());
+ DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
+ HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
+ region.setRegionServerServices(services);
+ // Get some random bytes.
+ byte[] row = Bytes.toBytes(getName());
+ byte[] value = Bytes.toBytes(getName());
+ // Test Put operation
+ try {
+ dodgyWAL.throwSyncException = true;
+ Put put = new Put(row);
+ put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value);
+ region.put(put);
+ fail();
+ } catch (IOException ioe) {
+ assertTrue(ioe instanceof TimeoutIOException);
+ }
+ // Verify that RS aborts
+ Mockito.verify(services, Mockito.times(1)).
+ abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
+
+ // Test Append operation
+ try {
+ dodgyWAL.throwSyncException = true;
+ Append a = new Append(row);
+ a.setReturnResults(false);
+ a.add(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES, value);
+ region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ fail();
+ } catch (IOException ioe) {
+ assertTrue(ioe instanceof TimeoutIOException);
+ }
+ // Verify that RS aborts
+ Mockito.verify(services, Mockito.times(2)).
+ abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
+
+ // Test Increment operation
+ try {
+ dodgyWAL.throwSyncException = true;
+ final Increment inc = new Increment(row);
+ inc.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("qual2"), 1);
+ region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ fail();
+ } catch (IOException ioe) {
+ assertTrue(ioe instanceof TimeoutIOException);
+ }
+ // Verify that RS aborts
+ Mockito.verify(services, Mockito.times(3)).
+ abort(Mockito.anyString(), Mockito.<Throwable>anyObject());
+ }
+
+ String getName() {
+ return name.getMethodName();
+ }
+
+ /**
+ * @return A region on which you must call
+ * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
+ */
+ public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+ Configuration conf, WAL wal) throws IOException {
+ return TEST_UTIL.createLocalHRegion(tableName.getName(), startKey, stopKey,
+ getName(), conf, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
+ }
+}