You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/13 21:10:29 UTC
[18/22] hbase git commit: HBASE-15252 Data loss when replaying wal if
HDFS timeout
HBASE-15252 Data loss when replaying wal if HDFS timeout
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/85e1d9a1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/85e1d9a1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/85e1d9a1
Branch: refs/heads/hbase-12439
Commit: 85e1d9a109341c5f4aabb0e82c96ab52e99a6d72
Parents: 12982d1
Author: zhangduo <zh...@apache.org>
Authored: Fri Feb 12 08:17:10 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Feb 12 16:16:37 2016 +0800
----------------------------------------------------------------------
.../regionserver/wal/ProtobufLogReader.java | 3 +-
.../hbase/regionserver/wal/TestWALReplay.java | 113 ++++++++++++++++++-
2 files changed, 112 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/85e1d9a1/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index dc5c9cc..bb25aa1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
/**
* A Protobuf based WAL has the following structure:
@@ -332,7 +333,7 @@ public class ProtobufLogReader extends ReaderBase {
}
ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
(int)size);
- } catch (IOException ipbe) {
+ } catch (InvalidProtocolBufferException ipbe) {
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
originalPosition + ", currentPosition=" + this.inputStream.getPos() +
", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/85e1d9a1/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index f004aeb..40e5baa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -22,9 +22,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.io.FilterInputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -91,6 +98,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hdfs.DFSInputStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -100,6 +108,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* Test replay of edits out of a WAL split.
@@ -499,7 +509,7 @@ public class TestWALReplay {
boolean first = true;
for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
- if (first ) {
+ if (first) {
// If first, so we have at least one family w/ different seqid to rest.
region.flush(true);
first = false;
@@ -819,9 +829,9 @@ public class TestWALReplay {
final Configuration newConf = HBaseConfiguration.create(this.conf);
User user = HBaseTestingUtility.getDifferentUser(newConf,
".replay.wal.secondtime");
- user.runAs(new PrivilegedExceptionAction() {
+ user.runAs(new PrivilegedExceptionAction<Void>() {
@Override
- public Object run() throws Exception {
+ public Void run() throws Exception {
runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf);
// 100k seems to make for about 4 flushes during HRegion#initialize.
@@ -927,6 +937,103 @@ public class TestWALReplay {
lastestSeqNumber, editCount);
}
+ /**
+ * testcase for https://issues.apache.org/jira/browse/HBASE-15252
+ */
+ @Test
+ public void testDatalossWhenInputError() throws IOException, InstantiationException,
+ IllegalAccessException {
+ final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
+ final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+ final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
+ deleteDir(basedir);
+ final byte[] rowName = tableName.getName();
+ final int countPerFamily = 10;
+ final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+ HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+ Path regionDir = region1.getRegionFileSystem().getRegionDir();
+ HBaseTestingUtility.closeRegionAndWAL(region1);
+
+ WAL wal = createWAL(this.conf);
+ HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
+ }
+ // Now assert edits made it in.
+ final Get g = new Get(rowName);
+ Result result = region.get(g);
+ assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+ // Now close the region (without flush), split the log, reopen the region and assert that
+ // replay of log has the correct effect.
+ region.close(true);
+ wal.shutdown();
+
+ runWALSplit(this.conf);
+
+ // here we let the DFSInputStream throw an IOException just after the WALHeader.
+ Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
+ FSDataInputStream stream = fs.open(editFile);
+ stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
+ Class<? extends DefaultWALProvider.Reader> logReaderClass =
+ conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+ DefaultWALProvider.Reader.class);
+ DefaultWALProvider.Reader reader = logReaderClass.newInstance();
+ reader.init(this.fs, editFile, conf, stream);
+ final long headerLength = stream.getPos();
+ reader.close();
+ FileSystem spyFs = spy(this.fs);
+ doAnswer(new Answer<FSDataInputStream>() {
+
+ @Override
+ public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+ FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
+ Field field = FilterInputStream.class.getDeclaredField("in");
+ field.setAccessible(true);
+ final DFSInputStream in = (DFSInputStream) field.get(stream);
+ DFSInputStream spyIn = spy(in);
+ doAnswer(new Answer<Integer>() {
+
+ private long pos;
+
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ if (pos >= headerLength) {
+ throw new IOException("read over limit");
+ }
+ int b = (Integer) invocation.callRealMethod();
+ if (b > 0) {
+ pos += b;
+ }
+ return b;
+ }
+ }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ invocation.callRealMethod();
+ in.close();
+ return null;
+ }
+ }).when(spyIn).close();
+ field.set(stream, spyIn);
+ return stream;
+ }
+ }).when(spyFs).open(eq(editFile));
+
+ WAL wal2 = createWAL(this.conf);
+ HRegion region2;
+ try {
+ // log replay should fail due to the IOException, otherwise we may lose data.
+ region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
+ assertEquals(result.size(), region2.get(g).size());
+ } catch (IOException e) {
+ assertEquals("read over limit", e.getMessage());
+ }
+ region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
+ assertEquals(result.size(), region2.get(g).size());
+ }
+
static class MockWAL extends FSHLog {
boolean doCompleteCacheFlush = false;