You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/01 00:43:02 UTC
[hbase] 02/02: HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit f5af7611c0dbbb5ea7d65ef5d916fa363aa0e82c
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 1 08:24:46 2021 +0800
HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)
Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 23 ++-
.../hadoop/hbase/wal/AsyncFSWALProvider.java | 4 +-
.../regionserver/wal/TestAsyncFSWALRollStuck.java | 205 +++++++++++++++++++++
3 files changed, 229 insertions(+), 3 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index ae26a47..f5619d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -462,12 +462,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
+ // confirm non-empty before calling
+ private static long getLastTxid(Deque<FSWALEntry> queue) {
+ return queue.peekLast().getTxid();
+ }
+
private void appendAndSync() {
final AsyncWriter writer = this.writer;
// maybe a sync request is not queued when we issue a sync, so check here to see if we could
// finish some.
finishSync(false);
long newHighestProcessedAppendTxid = -1L;
+ // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
+ // threaded, this could save us some cycles
+ boolean addedToUnackedAppends = false;
for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
FSWALEntry entry = iter.next();
boolean appended;
@@ -481,10 +489,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
if (appended) {
// This is possible, when we fail to sync, we will add the unackedAppends back to
// toWriteAppends, so here we may get an entry which is already in the unackedAppends.
- if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
+ if (addedToUnackedAppends || unackedAppends.isEmpty() ||
+ getLastTxid(unackedAppends) < entry.getTxid()) {
unackedAppends.addLast(entry);
+ addedToUnackedAppends = true;
}
- if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
+ // See HBASE-25905, here we need to make sure that, we will always write all the entries in
+ // unackedAppends out. As the code in the consume method will assume that, the entries in
+ // unackedAppends have all been sent out so if there is roll request and unackedAppends is
+ // not empty, we could just return as later there will be a syncCompleted call to clear the
+ // unackedAppends, or a syncFailed to lead us to another state.
+ // There could be other ways to fix, such as changing the logic in the consume method, but
+ // it will break the assumption and then (may) lead to a big refactoring. So here let's use
+ // this way to fix first, can optimize later.
+ if (writer.getLength() - fileLengthAtLastSync >= batchSize &&
+ (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))) {
break;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 377f6a4..4bef4bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -51,6 +51,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
+ public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl";
+
// Only public so classes back in regionserver.wal can access
public interface AsyncWriter extends WALProvider.AsyncWriter {
/**
@@ -106,7 +108,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
Class<? extends Channel> channelClass) throws IOException {
// Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
- "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
+ WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
.newInstance(eventLoopGroup, channelClass);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java
new file mode 100644
index 0000000..c2c7c51
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Testcase for HBASE-25905
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncFSWALRollStuck {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
+
+ private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
+
+ private static ScheduledExecutorService EXECUTOR;
+
+ private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3);
+
+ private static AtomicInteger SYNC_COUNT = new AtomicInteger(0);
+
+ private static CountDownLatch ARRIVE = new CountDownLatch(1);
+
+ private static CountDownLatch RESUME = new CountDownLatch(1);
+
+ public static final class TestAsyncWriter extends AsyncProtobufLogWriter {
+
+ public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
+ super(eventLoopGroup, channelClass);
+ }
+
+ @Override
+ public CompletableFuture<Long> sync(boolean forceSync) {
+ int count = SYNC_COUNT.incrementAndGet();
+ if (count < 3) {
+ // we will mark these two futures as failure, to make sure that we have 2 edits in
+ // unackedAppends, and trigger a WAL roll
+ CompletableFuture<Long> f = new CompletableFuture<>();
+ FUTURES.offer(f);
+ return f;
+ } else if (count == 3) {
+ // for this future, we will mark it as succeeded, but before returning from this method, we
+ // need to request a roll, to enter the special corner case, where we have left some edits
+ // in unackedAppends but never tries to write them out which leads to a hang
+ ARRIVE.countDown();
+ try {
+ RESUME.await();
+ } catch (InterruptedException e) {
+ }
+ return super.sync(forceSync);
+ } else {
+ return super.sync(forceSync);
+ }
+ }
+ }
+
+ private static TableName TN;
+
+ private static RegionInfo RI;
+
+ private static MultiVersionConcurrencyControl MVCC;
+
+ private static AsyncFSWAL WAL;
+
+ private static ExecutorService ROLL_EXEC;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class);
+ // set a very small size so we will reach the batch size when writing out a single edit
+ conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1);
+
+ TN = TableName.valueOf("test");
+ RI = RegionInfoBuilder.newBuilder(TN).build();
+ MVCC = new MultiVersionConcurrencyControl();
+
+ EXECUTOR =
+ Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());
+
+ Path rootDir = UTIL.getDataTestDir();
+ ROLL_EXEC =
+ Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
+ WALActionsListener listener = new WALActionsListener() {
+
+ @Override
+ public void logRollRequested(RollRequestReason reason) {
+ ROLL_EXEC.execute(() -> {
+ try {
+ WAL.rollWriter();
+ } catch (Exception e) {
+ LOG.warn("failed to roll writer", e);
+ }
+ });
+ }
+
+ };
+ WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf,
+ Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS);
+ WAL.init();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EXECUTOR.shutdownNow();
+ ROLL_EXEC.shutdownNow();
+ Closeables.close(WAL, true);
+ UTIL.cleanupTestDir();
+ }
+
+ @Test
+ public void testRoll() throws Exception {
+ byte[] row = Bytes.toBytes("family");
+ WALEdit edit = new WALEdit();
+ edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row)
+ .setQualifier(row).setRow(row).setValue(row)
+ .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build());
+ WALKeyImpl key1 =
+ new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
+ WAL.appendData(RI, key1, edit);
+
+ WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC);
+ long txid = WAL.appendData(RI, key2, edit);
+
+ // we need to make sure the two edits have both been added unackedAppends, so we have two syncs
+ UTIL.waitFor(10000, () -> FUTURES.size() == 2);
+ FUTURES.poll().completeExceptionally(new IOException("inject error"));
+ FUTURES.poll().completeExceptionally(new IOException("inject error"));
+ ARRIVE.await();
+ // resume after 1 seconds, to give us enough time to enter the roll state
+ EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS);
+ // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside
+ // waitForSafePoint
+ WAL.rollWriter();
+ // make sure we can finally succeed
+ WAL.sync(txid);
+ }
+}