You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/01 00:43:02 UTC

[hbase] 02/02: HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f5af7611c0dbbb5ea7d65ef5d916fa363aa0e82c
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 1 08:24:46 2021 +0800

    HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  23 ++-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java       |   4 +-
 .../regionserver/wal/TestAsyncFSWALRollStuck.java  | 205 +++++++++++++++++++++
 3 files changed, 229 insertions(+), 3 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index ae26a47..f5619d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -462,12 +462,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
+  // confirm non-empty before calling
+  private static long getLastTxid(Deque<FSWALEntry> queue) {
+    return queue.peekLast().getTxid();
+  }
+
   private void appendAndSync() {
     final AsyncWriter writer = this.writer;
     // maybe a sync request is not queued when we issue a sync, so check here to see if we could
     // finish some.
     finishSync(false);
     long newHighestProcessedAppendTxid = -1L;
+    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
+    // threaded, this could save us some cycles
+    boolean addedToUnackedAppends = false;
     for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
       FSWALEntry entry = iter.next();
       boolean appended;
@@ -481,10 +489,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       if (appended) {
         // This is possible, when we fail to sync, we will add the unackedAppends back to
         // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
-        if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
+        if (addedToUnackedAppends || unackedAppends.isEmpty() ||
+          getLastTxid(unackedAppends) < entry.getTxid()) {
           unackedAppends.addLast(entry);
+          addedToUnackedAppends = true;
         }
-        if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
+        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
+        // unackedAppends out. As the code in the consume method will assume that, the entries in
+        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
+        // not empty, we could just return as later there will be a syncCompleted call to clear the
+        // unackedAppends, or a syncFailed to lead us to another state.
+        // There could be other ways to fix, such as changing the logic in the consume method, but
+        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
+        // this way to fix first, can optimize later.
+        if (writer.getLength() - fileLengthAtLastSync >= batchSize &&
+          (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))) {
           break;
         }
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 377f6a4..4bef4bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -51,6 +51,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
 
+  public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl";
+
   // Only public so classes back in regionserver.wal can access
   public interface AsyncWriter extends WALProvider.AsyncWriter {
     /**
@@ -106,7 +108,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
       Class<? extends Channel> channelClass) throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends AsyncWriter> logWriterClass = conf.getClass(
-      "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
+      WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
     try {
       AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
           .newInstance(eventLoopGroup, channelClass);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java
new file mode 100644
index 0000000..c2c7c51
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Testcase for HBASE-25905
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncFSWALRollStuck {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
+
+  private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
+
+  private static ScheduledExecutorService EXECUTOR;
+
+  private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3);
+
+  private static AtomicInteger SYNC_COUNT = new AtomicInteger(0);
+
+  private static CountDownLatch ARRIVE = new CountDownLatch(1);
+
+  private static CountDownLatch RESUME = new CountDownLatch(1);
+
+  public static final class TestAsyncWriter extends AsyncProtobufLogWriter {
+
+    public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
+      super(eventLoopGroup, channelClass);
+    }
+
+    @Override
+    public CompletableFuture<Long> sync(boolean forceSync) {
+      int count = SYNC_COUNT.incrementAndGet();
+      if (count < 3) {
+        // we will mark these two futures as failure, to make sure that we have 2 edits in
+        // unackedAppends, and trigger a WAL roll
+        CompletableFuture<Long> f = new CompletableFuture<>();
+        FUTURES.offer(f);
+        return f;
+      } else if (count == 3) {
+        // for this future, we will mark it as succeeded, but before returning from this method, we
+        // need to request a roll, to enter the special corner case, where we have left some edits
+        // in unackedAppends but never tries to write them out which leads to a hang
+        ARRIVE.countDown();
+        try {
+          RESUME.await();
+        } catch (InterruptedException e) {
+        }
+        return super.sync(forceSync);
+      } else {
+        return super.sync(forceSync);
+      }
+    }
+  }
+
+  private static TableName TN;
+
+  private static RegionInfo RI;
+
+  private static MultiVersionConcurrencyControl MVCC;
+
+  private static AsyncFSWAL WAL;
+
+  private static ExecutorService ROLL_EXEC;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class);
+    // set a very small size so we will reach the batch size when writing out a single edit
+    conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1);
+
+    TN = TableName.valueOf("test");
+    RI = RegionInfoBuilder.newBuilder(TN).build();
+    MVCC = new MultiVersionConcurrencyControl();
+
+    EXECUTOR =
+      Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());
+
+    Path rootDir = UTIL.getDataTestDir();
+    ROLL_EXEC =
+      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
+    WALActionsListener listener = new WALActionsListener() {
+
+      @Override
+      public void logRollRequested(RollRequestReason reason) {
+        ROLL_EXEC.execute(() -> {
+          try {
+            WAL.rollWriter();
+          } catch (Exception e) {
+            LOG.warn("failed to roll writer", e);
+          }
+        });
+      }
+
+    };
+    WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf,
+      Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS);
+    WAL.init();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EXECUTOR.shutdownNow();
+    ROLL_EXEC.shutdownNow();
+    Closeables.close(WAL, true);
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void testRoll() throws Exception {
+    byte[] row = Bytes.toBytes("family");
+    WALEdit edit = new WALEdit();
+    edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row)
+      .setQualifier(row).setRow(row).setValue(row)
+      .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build());
+    WALKeyImpl key1 =
+      new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
+    WAL.appendData(RI, key1, edit);
+
+    WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC);
+    long txid = WAL.appendData(RI, key2, edit);
+
+    // we need to make sure the two edits have both been added unackedAppends, so we have two syncs
+    UTIL.waitFor(10000, () -> FUTURES.size() == 2);
+    FUTURES.poll().completeExceptionally(new IOException("inject error"));
+    FUTURES.poll().completeExceptionally(new IOException("inject error"));
+    ARRIVE.await();
+    // resume after 1 seconds, to give us enough time to enter the roll state
+    EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS);
+    // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside
+    // waitForSafePoint
+    WAL.rollWriter();
+    // make sure we can finally succeed
+    WAL.sync(txid);
+  }
+}