You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/01/25 14:30:05 UTC
hbase git commit: HBASE-17519 Rollback the removed cells (ChiaPing
Tsai)
Repository: hbase
Updated Branches:
refs/heads/branch-1 19f9a1a64 -> ed023058d
HBASE-17519 Rollback the removed cells (ChiaPing Tsai)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ed023058
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ed023058
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ed023058
Branch: refs/heads/branch-1
Commit: ed023058d23d98ab56030b20ae34ececfef1136e
Parents: 19f9a1a
Author: tedyu <yu...@gmail.com>
Authored: Wed Jan 25 06:29:56 2017 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Wed Jan 25 06:29:56 2017 -0800
----------------------------------------------------------------------
.../hbase/regionserver/DefaultMemStore.java | 12 +-
.../hadoop/hbase/regionserver/HRegion.java | 85 +++--
.../hadoop/hbase/regionserver/HStore.java | 5 +-
.../hadoop/hbase/regionserver/MemStore.java | 3 +-
.../apache/hadoop/hbase/regionserver/Store.java | 3 +-
.../hbase/client/TestRollbackFromClient.java | 352 +++++++++++++++++++
.../hbase/regionserver/TestDefaultMemStore.java | 6 +-
7 files changed, 418 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index a47cafd..7b7446a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -552,7 +552,7 @@ public class DefaultMemStore implements MemStore {
// 'now' and a 0 memstoreTS == immediately visible
List<Cell> cells = new ArrayList<Cell>(1);
cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
- return upsert(cells, 1L);
+ return upsert(cells, 1L, null);
}
/**
@@ -571,13 +571,14 @@ public class DefaultMemStore implements MemStore {
*
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param removedCells collect the removed cells. It can be null.
* @return change in memstore size
*/
@Override
- public long upsert(Iterable<Cell> cells, long readpoint) {
+ public long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) {
long size = 0;
for (Cell cell : cells) {
- size += upsert(cell, readpoint);
+ size += upsert(cell, readpoint, removedCells);
}
return size;
}
@@ -596,7 +597,7 @@ public class DefaultMemStore implements MemStore {
* @param cell
* @return change in size of MemStore
*/
- private long upsert(Cell cell, long readpoint) {
+ private long upsert(Cell cell, long readpoint, List<Cell> removedCells) {
// Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially
@@ -635,6 +636,9 @@ public class DefaultMemStore implements MemStore {
long delta = heapSizeChange(cur, true);
addedSize -= delta;
this.size.addAndGet(-delta);
+ if (removedCells != null) {
+ removedCells.add(cur);
+ }
it.remove();
setOldestEditTimeToNow();
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 209f6ad..9ed10d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -4025,11 +4025,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* the wal. This method is then invoked to rollback the memstore.
*/
private void rollbackMemstore(List<Cell> memstoreCells) {
- int kvsRolledback = 0;
+ rollbackMemstore(null, memstoreCells);
+ }
+ private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) {
+ int kvsRolledback = 0;
for (Cell cell : memstoreCells) {
- byte[] family = CellUtil.cloneFamily(cell);
- Store store = getStore(family);
+ Store store = defaultStore;
+ if (store == null) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ store = getStore(family);
+ }
store.rollback(cell);
kvsRolledback++;
}
@@ -7586,12 +7592,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] row = mutate.getRow();
checkRow(row, op.toString());
checkFamilies(mutate.getFamilyCellMap().keySet());
- boolean flush = false;
Durability durability = getEffectiveDurability(mutate.getDurability());
boolean writeToWAL = durability != Durability.SKIP_WAL;
WALEdit walEdits = null;
List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
+ Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
long size = 0;
long txid = 0;
checkReadOnly();
@@ -7718,26 +7724,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Actually write to Memstore now
- if (!tempMemstore.isEmpty()) {
- for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- if (store.getFamily().getMaxVersions() == 1) {
- // upsert if VERSIONS for this CF == 1
- size += store.upsert(entry.getValue(), getSmallestReadPoint());
- } else {
- // otherwise keep older versions around
- size += store.add(entry.getValue());
- if (!entry.getValue().isEmpty()) {
- doRollBackMemstore = true;
- }
+ doRollBackMemstore = !tempMemstore.isEmpty();
+ for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
+ Store store = entry.getKey();
+ if (store.getFamily().getMaxVersions() == 1) {
+ List<Cell> removedCells = removedCellsForMemStore.get(store);
+ if (removedCells == null) {
+ removedCells = new ArrayList<>();
+ removedCellsForMemStore.put(store, removedCells);
}
- // We add to all KVs here whereas when doing increment, we do it
- // earlier... why?
- allKVs.addAll(entry.getValue());
+ // upsert if VERSIONS for this CF == 1
+ size += store.upsert(entry.getValue(), getSmallestReadPoint(), removedCells);
+ } else {
+ // otherwise keep older versions around
+ size += store.add(entry.getValue());
}
-
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
+ // We add to all KVs here whereas when doing increment, we do it
+ // earlier... why?
+ allKVs.addAll(entry.getValue());
}
} finally {
this.updatesLock.readLock().unlock();
@@ -7763,7 +7767,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// if the wal sync was unsuccessful, remove keys from memstore
WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
if (doRollBackMemstore) {
- rollbackMemstore(allKVs);
+ for (Map.Entry<Store, List<Cell>> entry: tempMemstore.entrySet()) {
+ rollbackMemstore(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
+ entry.getKey().add(entry.getValue());
+ }
if (we != null) mvcc.complete(we);
} else if (we != null) {
mvcc.completeAndWait(we);
@@ -7775,12 +7784,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.metricsRegion != null) {
this.metricsRegion.updateAppend();
}
-
- if (flush) {
- // Request a cache flush. Do it outside update lock.
- requestFlush();
- }
-
+ if (isFlushSize(this.addAndGetGlobalMemstoreSize(size))) requestFlush();
return mutate.isReturnResults() ? Result.create(allKVs) : null;
}
@@ -7887,7 +7891,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean doRollBackMemstore = false;
long accumulatedResultSize = 0;
List<Cell> allKVs = new ArrayList<Cell>(increment.size());
- List<Cell> memstoreCells = new ArrayList<Cell>();
+ Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
+ Map<Store, List<Cell>> forMemStore = new HashMap<>();
Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
try {
rowLock = getRowLockInternal(increment.getRow(), false);
@@ -7907,7 +7912,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALEdit walEdits = null;
// Process increments a Store/family at a time.
// Accumulate edits for memstore to add later after we've added to WAL.
- Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
byte [] columnFamilyName = entry.getKey();
List<Cell> increments = entry.getValue();
@@ -7956,19 +7960,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Now write to memstore, a family at a time.
+ doRollBackMemstore = !forMemStore.isEmpty();
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
Store store = entry.getKey();
List<Cell> results = entry.getValue();
if (store.getFamily().getMaxVersions() == 1) {
+ List<Cell> removedCells = removedCellsForMemStore.get(store);
+ if (removedCells == null) {
+ removedCells = new ArrayList<>();
+ removedCellsForMemStore.put(store, removedCells);
+ }
// Upsert if VERSIONS for this CF == 1
- accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
- // TODO: St.Ack 20151222 Why no rollback in this case?
+ accumulatedResultSize += store.upsert(results, getSmallestReadPoint(), removedCells);
} else {
// Otherwise keep older versions around
accumulatedResultSize += store.add(entry.getValue());
- if (!entry.getValue().isEmpty()) {
- doRollBackMemstore = true;
- }
}
}
} finally {
@@ -7993,7 +7999,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
- rollbackMemstore(memstoreCells);
+ for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
+ rollbackMemstore(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
+ entry.getKey().add(entry.getValue());
+ }
if (walKey != null) mvcc.complete(walKey.getWriteEntry());
} else {
if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry());
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a15bf13..b2cc3a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2388,10 +2388,11 @@ public class HStore implements Store {
}
@Override
- public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
+ public long upsert(Iterable<Cell> cells, long readpoint,
+ List<Cell> removedCells) throws IOException {
this.lock.readLock().lock();
try {
- return this.memstore.upsert(cells, readpoint);
+ return this.memstore.upsert(cells, readpoint, removedCells);
} finally {
this.lock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index a885d79..5e5a1ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -135,9 +135,10 @@ public interface MemStore extends HeapSize {
* only see each KeyValue update as atomic.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate Cells.
+ * @param removedCells collect the removed cells. It can be null.
* @return change in memstore size
*/
- long upsert(Iterable<Cell> cells, long readpoint);
+ long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells);
/**
* @return scanner over the memstore. This might include scanner over the snapshot when one is
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index ccdc523..9d5d3b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -138,10 +138,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* across all of them.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs
+ * @param removedCells collect the removed cells. It can be null.
* @return memstore size delta
* @throws IOException
*/
- long upsert(Iterable<Cell> cells, long readpoint) throws IOException;
+ long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) throws IOException;
/**
* Adds a value to the memstore
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
new file mode 100644
index 0000000..9230f31
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java
@@ -0,0 +1,352 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(SmallTests.class)
+public class TestRollbackFromClient {
+ @Rule
+ public TestName name = new TestName();
+ private final static HBaseTestingUtility TEST_UTIL
+ = new HBaseTestingUtility();
+ private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+ private static final int SLAVES = 3;
+ private static final byte[] ROW = Bytes.toBytes("testRow");
+ private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+ private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2");
+ private static final byte[] VALUE = Bytes.toBytes("testValue");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName());
+ TEST_UTIL.startMiniCluster(SLAVES);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testAppendRollback() throws IOException {
+ Updater updateForEmptyTable = new Updater() {
+ @Override
+ public int updateData(Table table, byte[] family) {
+ try {
+ Append append = new Append(ROW);
+ append.add(FAMILY, QUALIFIER, VALUE);
+ append.add(FAMILY, QUALIFIER_V2, VALUE);
+ FailedHLog.SHOULD_FAIL.set(true);
+ table.append(append);
+ } catch (IOException e) {
+ // It should fail because the WAL fail also
+ } finally {
+ FailedHLog.SHOULD_FAIL.set(false);
+ }
+ return 0;
+ }
+ };
+ testRollback(updateForEmptyTable, 1, null);
+ testRollback(updateForEmptyTable, 2, null);
+
+ final Append preAppend = new Append(ROW);
+ preAppend.add(FAMILY, QUALIFIER, VALUE);
+ Cell initCell = preAppend.getCellList(FAMILY).get(0);
+ Updater updateForNonEmptyTable = new Updater() {
+ @Override
+ public int updateData(Table table, byte[] family) throws IOException {
+ table.append(preAppend);
+ try {
+ Append append = new Append(ROW);
+ append.add(FAMILY, QUALIFIER, VALUE);
+ append.add(FAMILY, QUALIFIER_V2, VALUE);
+ FailedHLog.SHOULD_FAIL.set(true);
+ table.append(append);
+ Assert.fail("It should fail because the WAL sync is failed");
+ } catch (IOException e) {
+ } finally {
+ FailedHLog.SHOULD_FAIL.set(false);
+ }
+ return 1;
+ }
+ };
+ testRollback(updateForNonEmptyTable, 1, initCell);
+ testRollback(updateForNonEmptyTable, 2, initCell);
+ }
+
+ @Test
+ public void testIncrementRollback() throws IOException {
+ Updater updateForEmptyTable = new Updater() {
+ @Override
+ public int updateData(Table table, byte[] family) {
+ try {
+ Increment inc = new Increment(ROW);
+ inc.addColumn(FAMILY, QUALIFIER, 1);
+ inc.addColumn(FAMILY, QUALIFIER_V2, 2);
+ FailedHLog.SHOULD_FAIL.set(true);
+ table.increment(inc);
+ } catch (IOException e) {
+ // It should fail because the WAL fail also
+ } finally {
+ FailedHLog.SHOULD_FAIL.set(false);
+ }
+ return 0;
+ }
+ };
+ testRollback(updateForEmptyTable, 1, null);
+ testRollback(updateForEmptyTable, 2, null);
+
+ final Increment preIncrement = new Increment(ROW);
+ preIncrement.addColumn(FAMILY, QUALIFIER, 1);
+ Cell initCell = preIncrement.getCellList(FAMILY).get(0);
+ Updater updateForNonEmptyTable = new Updater() {
+ @Override
+ public int updateData(Table table, byte[] family) throws IOException {
+ table.increment(preIncrement);
+ try {
+ Increment inc = new Increment(ROW);
+ inc.addColumn(FAMILY, QUALIFIER, 1);
+ inc.addColumn(FAMILY, QUALIFIER_V2, 2);
+ FailedHLog.SHOULD_FAIL.set(true);
+ table.increment(inc);
+ Assert.fail("It should fail because the WAL sync is failed");
+ } catch (IOException e) {
+ } finally {
+ FailedHLog.SHOULD_FAIL.set(false);
+ }
+ return 1;
+ }
+ };
+ testRollback(updateForNonEmptyTable, 1, initCell);
+ testRollback(updateForNonEmptyTable, 2, initCell);
+ }
+
+ @Test
+ public void testPutRollback() throws IOException {
+ Updater updateForEmptyTable = new Updater() {
+ @Override
+ public int updateData(Table table, byte[] family) {
+ try {
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, VALUE);
+ FailedHLog.SHOULD_FAIL.set(true);
+ table.put(put);
+ Assert.fail("It should fail because the WAL sync is failed");
+ } catch (IOException e) {
+ } finally {
+ FailedHLog.SHOULD_FAIL.set(false);
+ }
+ return 0;
+ }
+ };
+ testRollback(updateForEmptyTable, 1, null);
+ testRollback(updateForEmptyTable, 2, null);
+
+ final Put prePut = new Put(ROW);
+ prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa"));
+ Cell preCell = prePut.getCellList(FAMILY).get(0);
+ Updater updateForNonEmptyTable = new Updater() {
+ @Override
+ public int updateData(Table table, byte[] family) throws IOException {
+ table.put(prePut);
+ try {
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, VALUE);
+ FailedHLog.SHOULD_FAIL.set(true);
+ table.put(put);
+ Assert.fail("It should fail because the WAL sync is failed");
+ } catch (IOException e) {
+ } finally {
+ FailedHLog.SHOULD_FAIL.set(false);
+ }
+ return 1;
+ }
+ };
+ testRollback(updateForNonEmptyTable, 1, preCell);
+ testRollback(updateForNonEmptyTable, 2, preCell);
+ }
+
+ private void testRollback(Updater updater, int versions, Cell initCell) throws IOException {
+ TableName tableName = TableName.valueOf(this.name.getMethodName());
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor col = new HColumnDescriptor(FAMILY);
+ col.setMaxVersions(versions);
+ desc.addFamily(col);
+ TEST_UTIL.getHBaseAdmin().createTable(desc);
+ int expected;
+ List<Cell> cells;
+ try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ Table table = conn.getTable(tableName)) {
+ expected = updater.updateData(table, FAMILY);
+ cells = getAllCells(table);
+ }
+ TEST_UTIL.getHBaseAdmin().disableTable(tableName);
+ TEST_UTIL.getHBaseAdmin().deleteTable(tableName);
+ assertEquals(expected, cells.size());
+ if (initCell != null && cells.isEmpty()) {
+ Cell cell = cells.get(0);
+ assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell));
+ assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell));
+ assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell));
+ assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell));
+ }
+ }
+
+ interface Updater {
+ int updateData(Table table, byte[] family) throws IOException;
+ }
+
+ private static List<Cell> getAllCells(Table table) throws IOException {
+ List<Cell> cells = new ArrayList<>();
+ try (ResultScanner scanner = table.getScanner(new Scan())) {
+ for (Result r : scanner) {
+ cells.addAll(r.listCells());
+ }
+ return cells;
+ }
+ }
+
+ public static class FailedDefaultWALProvider extends DefaultWALProvider {
+ @Override
+ public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException {
+ WAL wal = super.getWAL(identifier, namespace);
+ return new FailedHLog(wal);
+ }
+ }
+
+ public static class FailedHLog implements WAL {
+ private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false);
+ private final WAL delegation;
+ FailedHLog(final WAL delegation) {
+ this.delegation = delegation;
+ }
+ @Override
+ public void registerWALActionsListener(WALActionsListener listener) {
+ delegation.registerWALActionsListener(listener);
+ }
+
+ @Override
+ public boolean unregisterWALActionsListener(WALActionsListener listener) {
+ return delegation.unregisterWALActionsListener(listener);
+ }
+
+ @Override
+ public byte[][] rollWriter() throws FailedLogCloseException, IOException {
+ return delegation.rollWriter();
+ }
+
+ @Override
+ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
+ return delegation.rollWriter(force);
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ delegation.shutdown();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegation.close();
+ }
+
+ @Override
+ public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException {
+ return delegation.append(htd, info, key, edits, inMemstore);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ delegation.sync();
+ }
+
+ @Override
+ public void sync(long txid) throws IOException {
+ if (SHOULD_FAIL.get()) {
+ throw new IOException("[TESTING] we need the failure!!!");
+ }
+ delegation.sync(txid);
+ }
+
+ @Override
+ public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
+ return delegation.startCacheFlush(encodedRegionName, families);
+ }
+
+ @Override
+ public void completeCacheFlush(byte[] encodedRegionName) {
+ delegation.completeCacheFlush(encodedRegionName);
+ }
+
+ @Override
+ public void abortCacheFlush(byte[] encodedRegionName) {
+ delegation.abortCacheFlush(encodedRegionName);
+ }
+
+ @Override
+ public WALCoprocessorHost getCoprocessorHost() {
+ return delegation.getCoprocessorHost();
+ }
+
+ @Override
+ public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
+ return delegation.getEarliestMemstoreSeqNum(encodedRegionName);
+ }
+
+ @Override
+ public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+ return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed023058/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 32adf5b..7cb74b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -866,7 +866,7 @@ public class TestDefaultMemStore extends TestCase {
kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
l.add(kv1); l.add(kv2); l.add(kv3);
- this.memstore.upsert(l, 2);// readpoint is 2
+ this.memstore.upsert(l, 2, null);// readpoint is 2
long newSize = this.memstore.size.get();
assert(newSize > oldSize);
//The kv1 should be removed.
@@ -875,7 +875,7 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
kv4.setSequenceId(1);
l.clear(); l.add(kv4);
- this.memstore.upsert(l, 3);
+ this.memstore.upsert(l, 3, null);
assertEquals(newSize, this.memstore.size.get());
//The kv2 should be removed.
assert(memstore.cellSet.size() == 2);
@@ -919,7 +919,7 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
kv1.setSequenceId(100);
l.add(kv1);
- memstore.upsert(l, 1000);
+ memstore.upsert(l, 1000, null);
t = memstore.timeOfOldestEdit();
assertTrue(t == 1234);
} finally {