You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/08/05 09:31:41 UTC

[hbase] branch branch-2.0 updated: HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7b2c3f6  HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
7b2c3f6 is described below

commit 7b2c3f601308ded263e82356389ffae39fe958ec
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Aug 5 16:19:05 2019 +0800

    HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
    
    Signed-off-by: Zheng Hu <op...@gmail.com>
---
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |  49 +++++++--
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   8 +-
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |   4 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |   6 +-
 .../hadoop/hbase/regionserver/wal/FSWALEntry.java  |  38 ++++---
 .../regionserver/wal/AbstractTestWALReplay.java    |   5 +-
 .../hbase/regionserver/wal/TestAsyncFSWAL.java     |   2 +-
 .../hadoop/hbase/regionserver/wal/TestFSHLog.java  |   2 +-
 ...syncFSWALCorruptionDueToDanglingByteBuffer.java | 111 +++++++++++++++++++++
 ...estFSHLogCorruptionDueToDanglingByteBuffer.java |  95 ++++++++++++++++++
 ...LCorruptionDueToDanglingByteBufferTestBase.java |  92 +++++++++++++++++
 11 files changed, 379 insertions(+), 33 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index cf1cf9a..881828b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -51,7 +51,7 @@ import org.apache.hadoop.util.StringUtils;
  * the result.
  */
 @InterfaceAudience.Private
-abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
+public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
 
   protected final int id;                             // the client's call id
   protected final BlockingService service;
@@ -91,8 +91,14 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
   private long exceptionSize = 0;
   private final boolean retryImmediatelySupported;
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-      justification="Can't figure why this complaint is happening... see below")
+  // This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
+  // second bit is for WAL reference. We can only call release if both of them are zero. The reason
+  // why we can not use a general reference counting is that, we may call cleanup multiple times in
+  // the current implementation. We should fix this in the future.
+  private final AtomicInteger reference = new AtomicInteger(0b01);
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+      justification = "Can't figure why this complaint is happening... see below")
   ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
       Message param, CellScanner cellScanner, T connection, long size,
       InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
@@ -141,14 +147,43 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
     cleanup();
   }
 
+  private void release(int mask) {
+    for (;;) {
+      int ref = reference.get();
+      if ((ref & mask) == 0) {
+        return;
+      }
+      int nextRef = ref & (~mask);
+      if (reference.compareAndSet(ref, nextRef)) {
+        if (nextRef == 0) {
+          if (this.reqCleanup != null) {
+            this.reqCleanup.run();
+          }
+        }
+        return;
+      }
+    }
+  }
+
   @Override
   public void cleanup() {
-    if (this.reqCleanup != null) {
-      this.reqCleanup.run();
-      this.reqCleanup = null;
+    release(0b01);
+  }
+
+  public void retainByWAL() {
+    for (;;) {
+      int ref = reference.get();
+      int nextRef = ref | 0b10;
+      if (reference.compareAndSet(ref, nextRef)) {
+        return;
+      }
     }
   }
 
+  public void releaseByWAL() {
+    release(0b10);
+  }
+
   @Override
   public String toString() {
     return toShortString() + " param: " +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 81ef16f..c528239 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -894,7 +896,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    * Exposed for testing only. Use to tricks like halt the ring buffer appending.
    */
   @VisibleForTesting
-  void atHeadOfRingBufferEventHandlerAppend() {
+  protected void atHeadOfRingBufferEventHandlerAppend() {
     // Noop
   }
 
@@ -970,8 +972,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       txidHolder.setValue(ringBuffer.next());
     });
     long txid = txidHolder.longValue();
+    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
+      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
     try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
-      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
+      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
       entry.stampRegionSequenceId(we);
       ringBuffer.get(txid).load(entry);
     } finally {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c7f1a91..293a3ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -322,7 +322,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
     highestSyncedTxid.set(processedTxid);
     for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
-      if (iter.next().getTxid() <= processedTxid) {
+      FSWALEntry entry = iter.next();
+      if (entry.getTxid() <= processedTxid) {
+        entry.release();
         iter.remove();
       } else {
         break;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 9c730be..2052b6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -24,7 +24,6 @@ import com.lmax.disruptor.LifecycleAware;
 import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
@@ -34,7 +33,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +56,7 @@ import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -951,7 +950,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
           //TODO handle htrace API change, see HBASE-18895
           //TraceScope scope = Trace.continueSpan(entry.detachSpan());
           try {
-
             if (this.exception != null) {
               // Return to keep processing events coming off the ringbuffer
               return;
@@ -968,6 +966,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
                     : new DamagedWALException("On sync", this.exception));
             // Return to keep processing events coming off the ringbuffer
             return;
+          } finally {
+            entry.release();
           }
         } else {
           // What is this if not an append or sync. Fail all up to this!!!
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index bee48f4..b547696 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -17,19 +17,17 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static java.util.stream.Collectors.toCollection;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
-
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
@@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
   private final transient boolean inMemstore;
   private final transient RegionInfo regionInfo;
   private final transient Set<byte[]> familyNames;
+  private final transient Optional<ServerCall<?>> rpcCall;
 
-  FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
-      final RegionInfo regionInfo, final boolean inMemstore) {
+  FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
+    final boolean inMemstore, ServerCall<?> rpcCall) {
     super(key, edit);
     this.inMemstore = inMemstore;
     this.regionInfo = regionInfo;
     this.txid = txid;
     if (inMemstore) {
       // construct familyNames here to reduce the work of log sinker.
-      Set<byte []> families = edit.getFamilies();
-      this.familyNames = families != null? families: collectFamilies(edit.getCells());
+      Set<byte[]> families = edit.getFamilies();
+      this.familyNames = families != null ? families : collectFamilies(edit.getCells());
     } else {
-      this.familyNames = Collections.<byte[]>emptySet();
+      this.familyNames = Collections.<byte[]> emptySet();
+    }
+    this.rpcCall = Optional.ofNullable(rpcCall);
+    if (rpcCall != null) {
+      rpcCall.retainByWAL();
     }
   }
 
@@ -77,12 +80,13 @@ class FSWALEntry extends Entry {
     if (CollectionUtils.isEmpty(cells)) {
       return Collections.emptySet();
     } else {
-      return cells.stream()
-           .filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
-           .collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
-           .stream()
-           .map(CellUtil::cloneFamily)
-           .collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
+      Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+      for (Cell cell: cells) {
+        if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+          set.add(CellUtil.cloneFamily(cell));
+        }
+      }
+      return set;
     }
   }
 
@@ -129,4 +133,8 @@ class FSWALEntry extends Entry {
   Set<byte[]> getFamilyNames() {
     return familyNames;
   }
+
+  void release() {
+    rpcCall.ifPresent(ServerCall::releaseByWAL);
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index e602c51..b2e75b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -1155,9 +1155,8 @@ public abstract class AbstractTestWALReplay {
   private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
       byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
       int index, NavigableMap<byte[], Integer> scopes) throws IOException {
-    FSWALEntry entry =
-        new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
-          rowName, family, ee, index), hri, true);
+    FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
+      createWALEdit(rowName, family, ee, index), hri, true, null);
     entry.stampRegionSequenceId(mvcc.begin());
     return entry;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index a0f90ba..d9fe3e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -111,7 +111,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
         suffix, GROUP, CHANNEL_CLASS) {
 
       @Override
-      void atHeadOfRingBufferEventHandlerAppend() {
+      protected void atHeadOfRingBufferEventHandlerAppend() {
         action.run();
         super.atHeadOfRingBufferEventHandlerAppend();
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 7baaa6c..1955974 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -87,7 +87,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
         suffix) {
 
       @Override
-      void atHeadOfRingBufferEventHandlerAppend() {
+      protected void atHeadOfRingBufferEventHandlerAppend() {
         action.run();
         super.atHeadOfRingBufferEventHandlerAppend();
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java
new file mode 100644
index 0000000..46aa871
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestAsyncFSWALCorruptionDueToDanglingByteBuffer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+/**
+ * Testcase for HBASE-22539
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer
+  extends WALCorruptionDueToDanglingByteBufferTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
+
+  public static final class PauseWAL extends AsyncFSWAL {
+
+    public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
+      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
+      String prefix, String suffix, EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+      super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
+        eventLoopGroup, channelClass);
+    }
+
+    @Override
+    protected void atHeadOfRingBufferEventHandlerAppend() {
+      if (ARRIVE != null) {
+        ARRIVE.countDown();
+        try {
+          RESUME.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
+
+    private EventLoopGroup eventLoopGroup;
+
+    private Class<? extends Channel> channelClass;
+
+    @Override
+    protected PauseWAL createWAL() throws IOException {
+      return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
+        getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+        conf, listeners, true, logPrefix,
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
+        channelClass);
+    }
+
+    @Override
+    protected void doInit(Configuration conf) throws IOException {
+      Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
+        NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+      eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+      channelClass = eventLoopGroupAndChannelClass.getSecond();
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
+      WALProvider.class);
+    UTIL.startMiniCluster(1);
+    UTIL.createTable(TABLE_NAME, CF);
+    UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java
new file mode 100644
index 0000000..73f7ad4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogCorruptionDueToDanglingByteBuffer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-22539
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestFSHLogCorruptionDueToDanglingByteBuffer
+  extends WALCorruptionDueToDanglingByteBufferTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFSHLogCorruptionDueToDanglingByteBuffer.class);
+
+  public static final class PauseWAL extends FSHLog {
+
+    public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
+      Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
+      String prefix, String suffix) throws IOException {
+      super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+    }
+
+    @Override
+    protected void atHeadOfRingBufferEventHandlerAppend() {
+      if (ARRIVE != null) {
+        ARRIVE.countDown();
+        try {
+          RESUME.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
+
+    @Override
+    protected PauseWAL createWAL() throws IOException {
+      return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
+        getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+        conf, listeners, true, logPrefix,
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
+    }
+
+    @Override
+    protected void doInit(Configuration conf) throws IOException {
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
+      WALProvider.class);
+    UTIL.startMiniCluster(1);
+    UTIL.createTable(TABLE_NAME, CF);
+    UTIL.waitTableAvailable(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java
new file mode 100644
index 0000000..127ed86
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALCorruptionDueToDanglingByteBufferTestBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testcase for HBASE-22539
+ */
+public abstract class WALCorruptionDueToDanglingByteBufferTestBase {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  protected static CountDownLatch ARRIVE;
+
+  protected static CountDownLatch RESUME;
+
+  protected static TableName TABLE_NAME = TableName.valueOf("Corruption");
+
+  protected static byte[] CF = Bytes.toBytes("cf");
+
+  protected static byte[] CQ = Bytes.toBytes("cq");
+
+  private byte[] getBytes(String prefix, int index) {
+    return Bytes.toBytes(String.format("%s-%08d", prefix, index));
+  }
+
+  @Test
+  public void test() throws Exception {
+    LOG.info("Stop WAL appending...");
+    ARRIVE = new CountDownLatch(1);
+    RESUME = new CountDownLatch(1);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+      LOG.info("Put 100 rows with " + Durability.ASYNC_WAL + "...");
+      for (int i = 0; i < 100; i++) {
+        table.batch(Arrays.asList(new Put(getBytes("row", i))
+          .addColumn(CF, CQ, getBytes("value", i)).setDurability(Durability.ASYNC_WAL)),
+          new Object[1]);
+      }
+      ARRIVE.await();
+      ARRIVE = null;
+      LOG.info("Resume WAL appending...");
+      RESUME.countDown();
+      LOG.info("Put a single row to force a WAL sync...");
+      table.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("value")));
+      LOG.info("Abort the only region server");
+      UTIL.getMiniHBaseCluster().abortRegionServer(0);
+      LOG.info("Start a new region server");
+      UTIL.getMiniHBaseCluster().startRegionServerAndWait(30000);
+      UTIL.waitTableAvailable(TABLE_NAME);
+      LOG.info("Check if all rows are still valid");
+      for (int i = 0; i < 100; i++) {
+        Result result = table.get(new Get(getBytes("row", i)));
+        assertEquals(Bytes.toString(getBytes("value", i)), Bytes.toString(result.getValue(CF, CQ)));
+      }
+      Result result = table.get(new Get(Bytes.toBytes("row")));
+      assertEquals("value", Bytes.toString(result.getValue(CF, CQ)));
+    }
+  }
+}