You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/01 00:43:00 UTC

[hbase] branch branch-2.4 updated (b9b075f -> f5af761)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    from b9b075f  HBASE-26476 Make DefaultMemStore extensible for HStore.memstore (#3869)
     new 3e14c8d  HBASE-26485 Introduce a method to clean restore directory after Snapshot Scan (#3877)
     new f5af761  HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hbase/mapreduce/TableSnapshotInputFormat.java  |  10 +
 .../mapreduce/TableSnapshotInputFormatImpl.java    |  21 +++
 .../mapreduce/TestTableSnapshotInputFormat.java    |  18 ++
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  23 ++-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java       |   4 +-
 .../regionserver/wal/TestAsyncFSWALRollStuck.java  | 205 +++++++++++++++++++++
 6 files changed, 278 insertions(+), 3 deletions(-)
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java

[hbase] 01/02: HBASE-26485 Introduce a method to clean restore directory after Snapshot Scan (#3877)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3e14c8d6e68dd9eb9da8921a0ffdd145732bd2b3
Author: Ruanhui <32...@users.noreply.github.com>
AuthorDate: Sat Nov 27 20:35:24 2021 +0800

    HBASE-26485 Introduce a method to clean restore directory after Snapshot Scan (#3877)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/mapreduce/TableSnapshotInputFormat.java   | 10 ++++++++++
 .../mapreduce/TableSnapshotInputFormatImpl.java     | 21 +++++++++++++++++++++
 .../mapreduce/TestTableSnapshotInputFormat.java     | 18 ++++++++++++++++++
 3 files changed, 49 insertions(+)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 24cbfcc..6fd0b6e 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -235,4 +235,14 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
      TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
              splitAlgo, numSplitsPerRegion);
    }
+
+  /**
+   *  clean restore directory after snapshot scan job
+   * @param job the snapshot scan job
+   * @param snapshotName the name of the snapshot to read from
+   * @throws IOException if an error occurs
+   */
+  public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
+    TableSnapshotInputFormatImpl.cleanRestoreDir(job, snapshotName);
+  }
 }
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index f83a9b9..e106b9d 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -622,4 +623,24 @@ public class TableSnapshotInputFormatImpl {
     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
     conf.set(RESTORE_DIR_KEY, restoreDir.toString());
   }
+
+  /**
+   *  clean restore directory after snapshot scan job
+   * @param job the snapshot scan job
+   * @param snapshotName the name of the snapshot to read from
+   * @throws IOException if an error occurs
+   */
+  public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
+    Configuration conf = job.getConfiguration();
+    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+    FileSystem fs = restoreDir.getFileSystem(conf);
+    if (!fs.exists(restoreDir)) {
+      LOG.warn("{} doesn't exist on file system, maybe it's already been cleaned", restoreDir);
+      return;
+    }
+    if (!fs.delete(restoreDir, true)) {
+      LOG.warn("Failed clean restore dir {} for snapshot {}", restoreDir, snapshotName);
+    }
+    LOG.debug("Clean restore directory {} for {}", restoreDir,  snapshotName);
+  }
 }
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index b1a07f0..188fc1f 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -575,4 +575,22 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
   public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
     testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
   }
+
+  @Test
+  public void testCleanRestoreDir() throws Exception {
+    TableName tableName = TableName.valueOf("test_table");
+    String snapshotName = "test_snapshot";
+    createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
+    Job job = Job.getInstance(UTIL.getConfiguration());
+    Path workingDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+      new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+      NullWritable.class, job, false, workingDir);
+    FileSystem fs = workingDir.getFileSystem(job.getConfiguration());
+    Path restorePath = new Path(job.getConfiguration()
+      .get("hbase.TableSnapshotInputFormat.restore.dir"));
+    Assert.assertTrue(fs.exists(restorePath));
+    TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
+    Assert.assertFalse(fs.exists(restorePath));
+  }
 }

[hbase] 02/02: HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f5af7611c0dbbb5ea7d65ef5d916fa363aa0e82c
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 1 08:24:46 2021 +0800

    HBASE-25905 Shutdown of WAL stuck at waitForSafePoint (#3898)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  23 ++-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java       |   4 +-
 .../regionserver/wal/TestAsyncFSWALRollStuck.java  | 205 +++++++++++++++++++++
 3 files changed, 229 insertions(+), 3 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index ae26a47..f5619d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -462,12 +462,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     }
   }
 
+  // confirm non-empty before calling
+  private static long getLastTxid(Deque<FSWALEntry> queue) {
+    return queue.peekLast().getTxid();
+  }
+
   private void appendAndSync() {
     final AsyncWriter writer = this.writer;
     // maybe a sync request is not queued when we issue a sync, so check here to see if we could
     // finish some.
     finishSync(false);
     long newHighestProcessedAppendTxid = -1L;
+    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
+    // threaded, this could save us some cycles
+    boolean addedToUnackedAppends = false;
     for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
       FSWALEntry entry = iter.next();
       boolean appended;
@@ -481,10 +489,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       if (appended) {
         // This is possible, when we fail to sync, we will add the unackedAppends back to
         // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
-        if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) {
+        if (addedToUnackedAppends || unackedAppends.isEmpty() ||
+          getLastTxid(unackedAppends) < entry.getTxid()) {
           unackedAppends.addLast(entry);
+          addedToUnackedAppends = true;
         }
-        if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
+        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
+        // unackedAppends out. As the code in the consume method will assume that, the entries in
+        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
+        // not empty, we could just return as later there will be a syncCompleted call to clear the
+        // unackedAppends, or a syncFailed to lead us to another state.
+        // There could be other ways to fix, such as changing the logic in the consume method, but
+        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
+        // this way to fix first, can optimize later.
+        if (writer.getLength() - fileLengthAtLastSync >= batchSize &&
+          (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))) {
           break;
         }
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 377f6a4..4bef4bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -51,6 +51,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
 
   private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWALProvider.class);
 
+  public static final String WRITER_IMPL = "hbase.regionserver.hlog.async.writer.impl";
+
   // Only public so classes back in regionserver.wal can access
   public interface AsyncWriter extends WALProvider.AsyncWriter {
     /**
@@ -106,7 +108,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
       Class<? extends Channel> channelClass) throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends AsyncWriter> logWriterClass = conf.getClass(
-      "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
+      WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
     try {
       AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
           .newInstance(eventLoopGroup, channelClass);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java
new file mode 100644
index 0000000..c2c7c51
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Testcase for HBASE-25905
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestAsyncFSWALRollStuck {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
+
+  private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
+
+  private static ScheduledExecutorService EXECUTOR;
+
+  private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue<>(3);
+
+  private static AtomicInteger SYNC_COUNT = new AtomicInteger(0);
+
+  private static CountDownLatch ARRIVE = new CountDownLatch(1);
+
+  private static CountDownLatch RESUME = new CountDownLatch(1);
+
+  public static final class TestAsyncWriter extends AsyncProtobufLogWriter {
+
+    public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
+      super(eventLoopGroup, channelClass);
+    }
+
+    @Override
+    public CompletableFuture<Long> sync(boolean forceSync) {
+      int count = SYNC_COUNT.incrementAndGet();
+      if (count < 3) {
+        // we will mark these two futures as failure, to make sure that we have 2 edits in
+        // unackedAppends, and trigger a WAL roll
+        CompletableFuture<Long> f = new CompletableFuture<>();
+        FUTURES.offer(f);
+        return f;
+      } else if (count == 3) {
+        // for this future, we will mark it as succeeded, but before returning from this method, we
+        // need to request a roll, to enter the special corner case, where we have left some edits
+        // in unackedAppends but never tries to write them out which leads to a hang
+        ARRIVE.countDown();
+        try {
+          RESUME.await();
+        } catch (InterruptedException e) {
+        }
+        return super.sync(forceSync);
+      } else {
+        return super.sync(forceSync);
+      }
+    }
+  }
+
+  private static TableName TN;
+
+  private static RegionInfo RI;
+
+  private static MultiVersionConcurrencyControl MVCC;
+
+  private static AsyncFSWAL WAL;
+
+  private static ExecutorService ROLL_EXEC;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setClass(AsyncFSWALProvider.WRITER_IMPL, TestAsyncWriter.class, AsyncWriter.class);
+    // set a very small size so we will reach the batch size when writing out a single edit
+    conf.setLong(AsyncFSWAL.WAL_BATCH_SIZE, 1);
+
+    TN = TableName.valueOf("test");
+    RI = RegionInfoBuilder.newBuilder(TN).build();
+    MVCC = new MultiVersionConcurrencyControl();
+
+    EXECUTOR =
+      Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());
+
+    Path rootDir = UTIL.getDataTestDir();
+    ROLL_EXEC =
+      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
+    WALActionsListener listener = new WALActionsListener() {
+
+      @Override
+      public void logRollRequested(RollRequestReason reason) {
+        ROLL_EXEC.execute(() -> {
+          try {
+            WAL.rollWriter();
+          } catch (Exception e) {
+            LOG.warn("failed to roll writer", e);
+          }
+        });
+      }
+
+    };
+    WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), rootDir, "log", "oldlog", conf,
+      Arrays.asList(listener), true, null, null, EVENT_LOOP_GROUP, CHANNEL_CLASS);
+    WAL.init();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EXECUTOR.shutdownNow();
+    ROLL_EXEC.shutdownNow();
+    Closeables.close(WAL, true);
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void testRoll() throws Exception {
+    byte[] row = Bytes.toBytes("family");
+    WALEdit edit = new WALEdit();
+    edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(row)
+      .setQualifier(row).setRow(row).setValue(row)
+      .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Type.Put).build());
+    WALKeyImpl key1 =
+      new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
+    WAL.appendData(RI, key1, edit);
+
+    WALKeyImpl key2 = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, key1.getWriteTime() + 1, MVCC);
+    long txid = WAL.appendData(RI, key2, edit);
+
+    // we need to make sure the two edits have both been added unackedAppends, so we have two syncs
+    UTIL.waitFor(10000, () -> FUTURES.size() == 2);
+    FUTURES.poll().completeExceptionally(new IOException("inject error"));
+    FUTURES.poll().completeExceptionally(new IOException("inject error"));
+    ARRIVE.await();
+    // resume after 1 seconds, to give us enough time to enter the roll state
+    EXECUTOR.schedule(() -> RESUME.countDown(), 1, TimeUnit.SECONDS);
+    // let's roll the wal, before the fix in HBASE-25905, it will hang forever inside
+    // waitForSafePoint
+    WAL.rollWriter();
+    // make sure we can finally succeed
+    WAL.sync(txid);
+  }
+}