You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/08/05 09:31:00 UTC
[hbase] branch branch-2.2 updated: HBASE-22539 WAL corruption due
to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 2806d1d HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
2806d1d is described below
commit 2806d1d8dee3c50d42b1fde72b75a7d610a944b2
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Aug 5 16:19:05 2019 +0800
HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
Signed-off-by: Zheng Hu <op...@gmail.com>
---
.../org/apache/hadoop/hbase/ipc/ServerCall.java | 49 +++++++--
.../hbase/regionserver/wal/AbstractFSWAL.java | 8 +-
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 4 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 6 +-
.../hadoop/hbase/regionserver/wal/FSWALEntry.java | 38 ++++---
.../regionserver/wal/AbstractTestWALReplay.java | 5 +-
.../hbase/regionserver/wal/TestAsyncFSWAL.java | 2 +-
.../hadoop/hbase/regionserver/wal/TestFSHLog.java | 2 +-
...syncFSWALCorruptionDueToDanglingByteBuffer.java | 111 +++++++++++++++++++++
...estFSHLogCorruptionDueToDanglingByteBuffer.java | 95 ++++++++++++++++++
...LCorruptionDueToDanglingByteBufferTestBase.java | 92 +++++++++++++++++
11 files changed, 379 insertions(+), 33 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index cf1cf9a..881828b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -51,7 +51,7 @@ import org.apache.hadoop.util.StringUtils;
* the result.
*/
@InterfaceAudience.Private
-abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
+public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
protected final int id; // the client's call id
protected final BlockingService service;
@@ -91,8 +91,14 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
- justification="Can't figure why this complaint is happening... see below")
+ // This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
+ // second bit is for WAL reference. We can only call release if both of them are zero. The reason
+ // why we can not use a general reference counting is that, we may call cleanup multiple times in
+ // the current implementation. We should fix this in the future.
+ private final AtomicInteger reference = new AtomicInteger(0b01);
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+ justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, T connection, long size,
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
@@ -141,14 +147,43 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
cleanup();
}
+ private void release(int mask) {
+ for (;;) {
+ int ref = reference.get();
+ if ((ref & mask) == 0) {
+ return;
+ }
+ int nextRef = ref & (~mask);
+ if (reference.compareAndSet(ref, nextRef)) {
+ if (nextRef == 0) {
+ if (this.reqCleanup != null) {
+ this.reqCleanup.run();
+ }
+ }
+ return;
+ }
+ }
+ }
+
@Override
public void cleanup() {
- if (this.reqCleanup != null) {
- this.reqCleanup.run();
- this.reqCleanup = null;
+ release(0b01);
+ }
+
+ public void retainByWAL() {
+ for (;;) {
+ int ref = reference.get();
+ int nextRef = ref | 0b10;
+ if (reference.compareAndSet(ref, nextRef)) {
+ return;
+ }
}
}
+ public void releaseByWAL() {
+ release(0b10);
+ }
+
@Override
public String toString() {
return toShortString() + " param: " +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 43f1512..5a4ea3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -901,7 +903,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
*/
@VisibleForTesting
- void atHeadOfRingBufferEventHandlerAppend() {
+ protected void atHeadOfRingBufferEventHandlerAppend() {
// Noop
}
@@ -977,8 +979,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
+ ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
+ .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
- FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
+ FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 79409a0..10c1a53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -321,7 +321,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid);
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
- if (iter.next().getTxid() <= processedTxid) {
+ FSWALEntry entry = iter.next();
+ if (entry.getTxid() <= processedTxid) {
+ entry.release();
iter.remove();
} else {
break;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index b77210e..f10b1ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -24,7 +24,6 @@ import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
-
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
@@ -34,7 +33,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -59,6 +57,7 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -963,7 +962,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
//TODO handle htrace API change, see HBASE-18895
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
try {
-
if (this.exception != null) {
// Return to keep processing events coming off the ringbuffer
return;
@@ -980,6 +978,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
: new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer
return;
+ } finally {
+ entry.release();
}
} else {
// What is this if not an append or sync. Fail all up to this!!!
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 6313212..1b44fcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -17,19 +17,17 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static java.util.stream.Collectors.toCollection;
-
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
-
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
private final transient boolean inMemstore;
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
+ private final transient Optional<ServerCall<?>> rpcCall;
- FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
- final RegionInfo regionInfo, final boolean inMemstore) {
+ FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
+ final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
- Set<byte []> families = edit.getFamilies();
- this.familyNames = families != null? families: collectFamilies(edit.getCells());
+ Set<byte[]> families = edit.getFamilies();
+ this.familyNames = families != null ? families : collectFamilies(edit.getCells());
} else {
- this.familyNames = Collections.<byte[]>emptySet();
+ this.familyNames = Collections.<byte[]> emptySet();
+ }
+ this.rpcCall = Optional.ofNullable(rpcCall);
+ if (rpcCall != null) {
+ rpcCall.retainByWAL();
}
}
@@ -77,12 +80,13 @@ class FSWALEntry extends Entry {
if (CollectionUtils.isEmpty(cells)) {
return Collections.emptySet();
} else {
- return cells.stream()
- .filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
- .collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
- .stream()
- .map(CellUtil::cloneFamily)
- .collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
+ Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ for (Cell cell: cells) {
+ if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ set.add(CellUtil.cloneFamily(cell));
+ }
+ }
+ return set;
}
}
@@ -129,4 +133,8 @@ class FSWALEntry extends Entry {
Set<byte[]> getFamilyNames() {
return familyNames;
}
+
+ void release() {
+ rpcCall.ifPresent(ServerCall::releaseByWAL);
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index cd8696a..7a33e53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -1156,9 +1156,8 @@ public abstract class AbstractTestWALReplay {
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
- FSWALEntry entry =
- new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
- rowName, family, ee, index), hri, true);
+ FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
+ createWALEdit(rowName, family, ee, index), hri, true, null);
entry.stampRegionSequenceId(mvcc.begin());
return entry;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 9302428..effecb8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -112,7 +112,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
AsyncFSWAL asyncFSWAL = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners,
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
@Override
- void atHeadOfRingBufferEventHandlerAppend() {
+ protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 614ba79..8f6e518 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -89,7 +89,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
conf, listeners, failIfWALExists, prefix, suffix) {
@Override
- void atHeadOfRingBufferEventHandlerAppend() {
+ protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java
new file mode 100644
index 0000000..46aa871
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+/**
+ * Testcase for HBASE-22539
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer
+ extends WALCorruptionDueToDanglingByteBufferTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
+
+ public static final class PauseWAL extends AsyncFSWAL {
+
+ public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
+ Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
+ String prefix, String suffix, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+ super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
+ eventLoopGroup, channelClass);
+ }
+
+ @Override
+ protected void atHeadOfRingBufferEventHandlerAppend() {
+ if (ARRIVE != null) {
+ ARRIVE.countDown();
+ try {
+ RESUME.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
+
+ private EventLoopGroup eventLoopGroup;
+
+ private Class<? extends Channel> channelClass;
+
+ @Override
+ protected PauseWAL createWAL() throws IOException {
+ return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
+ getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+ conf, listeners, true, logPrefix,
+ META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
+ channelClass);
+ }
+
+ @Override
+ protected void doInit(Configuration conf) throws IOException {
+ Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
+ NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+ eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+ channelClass = eventLoopGroupAndChannelClass.getSecond();
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
+ WALProvider.class);
+ UTIL.startMiniCluster(1);
+ UTIL.createTable(TABLE_NAME, CF);
+ UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java
new file mode 100644
index 0000000..73f7ad4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-22539
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestFSHLogCorruptionDueToDanglingByteBuffer
+ extends WALCorruptionDueToDanglingByteBufferTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestFSHLogCorruptionDueToDanglingByteBuffer.class);
+
+ public static final class PauseWAL extends FSHLog {
+
+ public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
+ Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
+ String prefix, String suffix) throws IOException {
+ super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+ }
+
+ @Override
+ protected void atHeadOfRingBufferEventHandlerAppend() {
+ if (ARRIVE != null) {
+ ARRIVE.countDown();
+ try {
+ RESUME.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
+
+ @Override
+ protected PauseWAL createWAL() throws IOException {
+ return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
+ getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+ conf, listeners, true, logPrefix,
+ META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+ }
+
+ @Override
+ protected void doInit(Configuration conf) throws IOException {
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
+ WALProvider.class);
+ UTIL.startMiniCluster(1);
+ UTIL.createTable(TABLE_NAME, CF);
+ UTIL.waitTableAvailable(TABLE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java
new file mode 100644
index 0000000..127ed86
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testcase for HBASE-22539
+ */
+public abstract class WALCorruptionDueToDanglingByteBufferTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ protected static CountDownLatch ARRIVE;
+
+ protected static CountDownLatch RESUME;
+
+ protected static TableName TABLE_NAME = TableName.valueOf("Corruption");
+
+ protected static byte[] CF = Bytes.toBytes("cf");
+
+ protected static byte[] CQ = Bytes.toBytes("cq");
+
+ private byte[] getBytes(String prefix, int index) {
+ return Bytes.toBytes(String.format("%s-%08d", prefix, index));
+ }
+
+ @Test
+ public void test() throws Exception {
+ LOG.info("Stop WAL appending...");
+ ARRIVE = new CountDownLatch(1);
+ RESUME = new CountDownLatch(1);
+ try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+ LOG.info("Put 100 rows with " + Durability.ASYNC_WAL + "...");
+ for (int i = 0; i < 100; i++) {
+ table.batch(Arrays.asList(new Put(getBytes("row", i))
+ .addColumn(CF, CQ, getBytes("value", i)).setDurability(Durability.ASYNC_WAL)),
+ new Object[1]);
+ }
+ ARRIVE.await();
+ ARRIVE = null;
+ LOG.info("Resume WAL appending...");
+ RESUME.countDown();
+ LOG.info("Put a single row to force a WAL sync...");
+ table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value")));
+ LOG.info("Abort the only region server");
+ UTIL.getMiniHBaseCluster().abortRegionServer(0);
+ LOG.info("Start a new region server");
+ UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000);
+ UTIL.waitTableAvailable(TABLE_NAME);
+ LOG.info("Check if all rows are still valid");
+ for (int i = 0; i < 100; i++) {
+ Result result = table.get(new Get(getBytes("row", i)));
+ assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ)));
+ }
+ Result result = table.get(new Get(Bytes.toBytes("row")));
+ assertEquals("value", Bytes.toString(result.getValue(CF, CQ)));
+ }
+ }
+}