You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2020/07/28 04:37:56 UTC

[hbase] branch branch-2.3 updated: HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll #2155

This is an automated email from the ASF dual-hosted git repository.

anoopsamjohn pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 7ef1aca  HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll #2155
7ef1aca is described below

commit 7ef1aca01af370e506986aa1c3769e68eb8bebdf
Author: WenFeiYi <we...@gmail.com>
AuthorDate: Tue Jul 28 12:37:37 2020 +0800

    HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll #2155
    
    Co-authored-by: wen_yi <li...@immomo.com>
    Signed-off-by: Anoop <an...@apache.org>
    Signed-off-by: Ramkrishna <ra...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../hadoop/hbase/regionserver/LogRoller.java       |   4 +-
 .../apache/hadoop/hbase/wal/AbstractWALRoller.java | 119 +++++++++++++--------
 .../hadoop/hbase/regionserver/TestLogRoller.java   | 117 ++++++++++++++++++++
 3 files changed, 196 insertions(+), 44 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index f5049c9..992b117 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -63,7 +63,7 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
   }
 
   @VisibleForTesting
-  Map<WAL, Boolean> getWalNeedsRoll() {
-    return this.walNeedsRoll;
+  Map<WAL, RollController> getWalNeedsRoll() {
+    return this.wals;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index 3154c19..8aa8a63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.wal;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.ConnectException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
@@ -56,31 +55,31 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
 
   protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
 
-  protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
+  protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
   protected final T abortable;
-  private volatile long lastRollTime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollPeriod;
   private final int threadWakeFrequency;
   // The interval to check low replication on hlog's pipeline
-  private long checkLowReplicationInterval;
+  private final long checkLowReplicationInterval;
 
   private volatile boolean running = true;
 
   public void addWAL(WAL wal) {
     // check without lock first
-    if (walNeedsRoll.containsKey(wal)) {
+    if (wals.containsKey(wal)) {
       return;
     }
     // this is to avoid race between addWAL and requestRollAll.
     synchronized (this) {
-      if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
+      if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
         wal.registerWALActionsListener(new WALActionsListener() {
           @Override
           public void logRollRequested(WALActionsListener.RollRequestReason reason) {
             // TODO logs will contend with each other here, replace with e.g. DelayedQueue
             synchronized (AbstractWALRoller.this) {
-              walNeedsRoll.put(wal, Boolean.TRUE);
+              RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
+              controller.requestRoll();
               AbstractWALRoller.this.notifyAll();
             }
           }
@@ -91,9 +90,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
 
   public void requestRollAll() {
     synchronized (this) {
-      List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
-      for (WAL wal : wals) {
-        walNeedsRoll.put(wal, Boolean.TRUE);
+      for (RollController controller : wals.values()) {
+        controller.requestRoll();
       }
       notifyAll();
     }
@@ -113,9 +111,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
    */
   private void checkLowReplication(long now) {
     try {
-      for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+      for (Entry<WAL, RollController> entry : wals.entrySet()) {
         WAL wal = entry.getKey();
-        boolean needRollAlready = entry.getValue();
+        boolean needRollAlready = entry.getValue().needsRoll(now);
         if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
           continue;
         }
@@ -131,7 +129,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
     // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
     // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
     // is already broken.
-    for (WAL wal : walNeedsRoll.keySet()) {
+    for (WAL wal : wals.keySet()) {
       // shutdown rather than close here since we are going to abort the RS and the wals need to be
       // split when recovery
       try {
@@ -146,42 +144,39 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
   @Override
   public void run() {
     while (running) {
-      boolean periodic = false;
       long now = System.currentTimeMillis();
       checkLowReplication(now);
-      periodic = (now - this.lastRollTime) > this.rollPeriod;
-      if (periodic) {
-        // Time for periodic roll, fall through
-        LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
-      } else {
-        synchronized (this) {
-          if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
-            // WAL roll requested, fall through
-            LOG.debug("WAL roll requested");
-          } else {
-            try {
-              wait(this.threadWakeFrequency);
-            } catch (InterruptedException e) {
-              // restore the interrupt state
-              Thread.currentThread().interrupt();
-            }
-            // goto the beginning to check whether again whether we should fall through to roll
-            // several WALs, and also check whether we should quit.
-            continue;
+      synchronized (this) {
+        if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
+          try {
+            wait(this.threadWakeFrequency);
+          } catch (InterruptedException e) {
+            // restore the interrupt state
+            Thread.currentThread().interrupt();
           }
+          // goto the beginning to check whether again whether we should fall through to roll
+          // several WALs, and also check whether we should quit.
+          continue;
         }
       }
       try {
-        this.lastRollTime = System.currentTimeMillis();
-        for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
-          .hasNext();) {
-          Entry<WAL, Boolean> entry = iter.next();
+        for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator();
+             iter.hasNext();) {
+          Entry<WAL, RollController> entry = iter.next();
           WAL wal = entry.getKey();
-          // reset the flag in front to avoid missing roll request before we return from rollWriter.
-          walNeedsRoll.put(wal, Boolean.FALSE);
+          RollController controller = entry.getValue();
+          if (controller.isRollRequested()) {
+            // WAL roll requested, fall through
+            LOG.debug("WAL {} roll requested", wal);
+          } else if (controller.needsPeriodicRoll(now)){
+            // Time for periodic roll, fall through
+            LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
+          } else {
+            continue;
+          }
           // Force the roll if the logroll.period is elapsed or if a roll was requested.
           // The returned value is an array of actual region names.
-          byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
+          byte[][] regionsToFlush = controller.rollWal(now);
           if (regionsToFlush != null) {
             for (byte[] r : regionsToFlush) {
               scheduleFlush(Bytes.toString(r));
@@ -223,7 +218,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
    * @return true if all WAL roll finished
    */
   public boolean walRollFinished() {
-    return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
+    return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis()))
+      && isWaiting();
   }
 
   /**
@@ -240,4 +236,43 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
     running = false;
     interrupt();
   }
+
+  /**
+   * Independently control the roll of each wal. When use multiwal,
+   * can avoid all wal roll together. see HBASE-24665 for detail
+   */
+  protected class RollController {
+    private final WAL wal;
+    private final AtomicBoolean rollRequest;
+    private long lastRollTime;
+
+    RollController(WAL wal) {
+      this.wal = wal;
+      this.rollRequest = new AtomicBoolean(false);
+      this.lastRollTime = System.currentTimeMillis();
+    }
+
+    public void requestRoll() {
+      this.rollRequest.set(true);
+    }
+
+    public byte[][] rollWal(long now) throws IOException {
+      this.lastRollTime = now;
+      // reset the flag in front to avoid missing roll request before we return from rollWriter.
+      this.rollRequest.set(false);
+      return wal.rollWriter(true);
+    }
+
+    public boolean isRollRequested() {
+      return rollRequest.get();
+    }
+
+    public boolean needsPeriodicRoll(long now) {
+      return (now - this.lastRollTime) > rollPeriod;
+    }
+
+    public boolean needsRoll(long now) {
+      return isRollRequested() || needsPeriodicRoll(now);
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
new file mode 100644
index 0000000..86b9bc3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestLogRoller {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLogRoller.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final int LOG_ROLL_PERIOD = 20 * 1000;
+  private static final String LOG_DIR = "WALs";
+  private static final String ARCHIVE_DIR = "archiveWALs";
+  private static final String WAL_PREFIX = "test-log-roller";
+  private static Configuration CONF;
+  private static LogRoller ROLLER;
+  private static Path ROOT_DIR;
+  private static FileSystem FS;
+
+  @Before
+  public void setup() throws Exception {
+    CONF = TEST_UTIL.getConfiguration();
+    CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+    CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+    ROOT_DIR = TEST_UTIL.getRandomDir();
+    FS = FileSystem.get(CONF);
+    RegionServerServices services = Mockito.mock(RegionServerServices.class);
+    Mockito.when(services.getConfiguration()).thenReturn(CONF);
+    ROLLER = new LogRoller(services);
+    ROLLER.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    ROLLER.close();
+    FS.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * verify that each wal roll separately
+   */
+  @Test
+  public void testRequestRollWithMultiWal() throws Exception {
+    // add multiple wal
+    Map<FSHLog, Path> wals = new HashMap<>();
+    for (int i = 1; i <= 3; i++) {
+      FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+        true, WAL_PREFIX, "." + i);
+      wal.init();
+      wals.put(wal, wal.getCurrentFileName());
+      ROLLER.addWAL(wal);
+      Thread.sleep(1000);
+    }
+
+    // request roll
+    Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
+    Map.Entry<FSHLog, Path> walEntry = it.next();
+    walEntry.getKey().requestLogRoll();
+    Thread.sleep(5000);
+
+    assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+    walEntry.setValue(walEntry.getKey().getCurrentFileName());
+    while (it.hasNext()) {
+      walEntry = it.next();
+      assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+    }
+
+    // period roll
+    Thread.sleep(LOG_ROLL_PERIOD + 5000);
+    for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
+      assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
+      entry.getKey().close();
+    }
+  }
+}