You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2020/10/16 12:42:43 UTC

[hbase] branch branch-1 updated: HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)

This is an automated email from the ASF dual-hosted git repository.

reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new e066951  HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)
e066951 is described below

commit e06695112a358706344cc8682f2713b43daab340
Author: WenFeiYi <we...@gmail.com>
AuthorDate: Fri Oct 16 20:42:18 2020 +0800

    HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)
    
    Signed-off-by: Reid Chan <re...@apache.org>
---
 .../hadoop/hbase/regionserver/LogRoller.java       | 108 ++++++++++++++-----
 .../hadoop/hbase/regionserver/TestLogRoller.java   | 114 +++++++++++++++++++++
 2 files changed, 194 insertions(+), 28 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index fd208c2..08d5a33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -56,23 +57,27 @@ public class LogRoller extends HasThread {
   private static final Log LOG = LogFactory.getLog(LogRoller.class);
   private final ReentrantLock rollLock = new ReentrantLock();
   private final AtomicBoolean rollLog = new AtomicBoolean(false);
-  private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
-      new ConcurrentHashMap<WAL, Boolean>();
+  private final ConcurrentHashMap<WAL, RollController> wals =
+      new ConcurrentHashMap<WAL, RollController>();
   private final Server server;
   protected final RegionServerServices services;
-  private volatile long lastrolltime = System.currentTimeMillis();
   // Period to roll log.
-  private final long rollperiod;
+  private final long rollPeriod;
   private final int threadWakeFrequency;
   // The interval to check low replication on hlog's pipeline
-  private long checkLowReplicationInterval;
+  private final long checkLowReplicationInterval;
 
   public void addWAL(final WAL wal) {
-    if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
+    if (null == wals.putIfAbsent(wal, new RollController(wal))) {
       wal.registerWALActionsListener(new WALActionsListener.Base() {
         @Override
         public void logRollRequested(WALActionsListener.RollRequestReason reason) {
-          walNeedsRoll.put(wal, Boolean.TRUE);
+          RollController controller = wals.get(wal);
+          if (controller == null) {
+            wals.putIfAbsent(wal, new RollController(wal));
+            controller = wals.get(wal);
+          }
+          controller.requestRoll();
           // TODO logs will contend with each other here, replace with e.g. DelayedQueue
           synchronized(rollLog) {
             rollLog.set(true);
@@ -84,8 +89,8 @@ public class LogRoller extends HasThread {
   }
 
   public void requestRollAll() {
-    for (WAL wal : walNeedsRoll.keySet()) {
-      walNeedsRoll.put(wal, Boolean.TRUE);
+    for (RollController controller : wals.values()) {
+      controller.requestRoll();
     }
     synchronized(rollLog) {
       rollLog.set(true);
@@ -98,7 +103,7 @@ public class LogRoller extends HasThread {
     super("LogRoller");
     this.server = server;
     this.services = services;
-    this.rollperiod = this.server.getConfiguration().
+    this.rollPeriod = this.server.getConfiguration().
       getLong("hbase.regionserver.logroll.period", 3600000);
     this.threadWakeFrequency = this.server.getConfiguration().
       getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -120,9 +125,9 @@ public class LogRoller extends HasThread {
    */
   void checkLowReplication(long now) {
     try {
-      for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+      for (Entry<WAL, RollController> entry : wals.entrySet()) {
         WAL wal = entry.getKey();
-        boolean neeRollAlready = entry.getValue();
+        boolean neeRollAlready = entry.getValue().needsRoll(now);
         if(wal instanceof FSHLog && !neeRollAlready) {
           FSHLog hlog = (FSHLog)wal;
           if ((now - hlog.getLastTimeCheckLowReplication())
@@ -139,11 +144,16 @@ public class LogRoller extends HasThread {
   @Override
   public void run() {
     while (!server.isStopped()) {
-      long now = System.currentTimeMillis();
+      long now = EnvironmentEdgeManager.currentTime();
       checkLowReplication(now);
-      boolean periodic = false;
       if (!rollLog.get()) {
-        periodic = (now - this.lastrolltime) > this.rollperiod;
+        boolean periodic = false;
+        for (RollController controller : wals.values()) {
+          if (controller.needsPeriodicRoll(now)) {
+            periodic = true;
+            break;
+          }
+        }
         if (!periodic) {
           synchronized (rollLog) {
             try {
@@ -156,23 +166,24 @@ public class LogRoller extends HasThread {
           }
           continue;
         }
-        // Time for periodic roll
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
-        }
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("WAL roll requested");
       }
       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
       try {
-        this.lastrolltime = now;
-        for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+        for (Entry<WAL, RollController> entry : wals.entrySet()) {
           final WAL wal = entry.getKey();
+          RollController controller = entry.getValue();
+          if (controller.isRollRequested()) {
+            // WAL roll requested, fall through
+            LOG.debug("WAL " + wal + " roll requested");
+          } else if (controller.needsPeriodicRoll(now)) {
+            // Time for periodic roll, fall through
+            LOG.debug("WAL " + wal + " roll period " + this.rollPeriod + "ms elapsed");
+          } else {
+            continue;
+          }
           // Force the roll if the logroll.period is elapsed or if a roll was requested.
           // The returned value is an array of actual region names.
-          final byte [][] regionsToFlush = wal.rollWriter(periodic ||
-              entry.getValue().booleanValue());
-          walNeedsRoll.put(wal, Boolean.FALSE);
+          final byte [][] regionsToFlush = controller.rollWal(now);
           if (regionsToFlush != null) {
             for (byte [] r: regionsToFlush) scheduleFlush(r);
           }
@@ -229,11 +240,52 @@ public class LogRoller extends HasThread {
    */
   @VisibleForTesting
   public boolean walRollFinished() {
-    for (boolean needRoll : walNeedsRoll.values()) {
-      if (needRoll) {
+    long now = EnvironmentEdgeManager.currentTime();
+    for (RollController controller : wals.values()) {
+      if (controller.needsRoll(now)) {
         return false;
       }
     }
     return true;
   }
+
+
+  /**
+   * Independently control the roll of each wal. When use multiwal,
+   * can avoid all wal roll together. see HBASE-24665 for detail
+   */
+  protected class RollController {
+    private final WAL wal;
+    private final AtomicBoolean rollRequest;
+    private long lastRollTime;
+
+    RollController(WAL wal) {
+      this.wal = wal;
+      this.rollRequest = new AtomicBoolean(false);
+      this.lastRollTime = EnvironmentEdgeManager.currentTime();
+    }
+
+    public void requestRoll() {
+      this.rollRequest.set(true);
+    }
+
+    public byte[][] rollWal(long now) throws IOException {
+      this.lastRollTime = now;
+      byte[][] regionsToFlush = wal.rollWriter(true);
+      this.rollRequest.set(false);
+      return regionsToFlush;
+    }
+
+    public boolean isRollRequested() {
+      return rollRequest.get();
+    }
+
+    public boolean needsPeriodicRoll(long now) {
+      return (now - this.lastRollTime) > rollPeriod;
+    }
+
+    public boolean needsRoll(long now) {
+      return isRollRequested() || needsPeriodicRoll(now);
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
new file mode 100644
index 0000000..fac6b99
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestLogRoller {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final int LOG_ROLL_PERIOD = 20 * 1000;
+  private static final String LOG_DIR = "WALs";
+  private static final String ARCHIVE_DIR = "archiveWALs";
+  private static final String WAL_PREFIX = "test-log-roller";
+  private static Configuration CONF;
+  private static LogRoller ROLLER;
+  private static Path ROOT_DIR;
+  private static FileSystem FS;
+
+  @Before
+  public void setup() throws Exception {
+    CONF = TEST_UTIL.getConfiguration();
+    CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+    CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+    ROOT_DIR = TEST_UTIL.getRandomDir();
+    FS = FileSystem.get(CONF);
+    HRegionServer server = Mockito.mock(HRegionServer.class);
+    Mockito.when(server.getConfiguration()).thenReturn(CONF);
+    RegionServerServices services = Mockito.mock(RegionServerServices.class);
+    ROLLER = new LogRoller(server, services);
+    ROLLER.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    ROLLER.interrupt();
+    FS.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * verify that each wal roll separately
+   */
+  @Test
+  public void testRequestRollWithMultiWal() throws Exception {
+    // add multiple wal
+    Map<FSHLog, Path> wals = new HashMap<FSHLog, Path>();
+    for (int i = 1; i <= 3; i++) {
+      FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+          true, WAL_PREFIX, "." + i);
+      wal.rollWriter(true);
+      wals.put(wal, wal.getCurrentFileName());
+      ROLLER.addWAL(wal);
+      Thread.sleep(1000);
+    }
+
+    // request roll
+    Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
+    Map.Entry<FSHLog, Path> walEntry = it.next();
+    walEntry.getKey().requestLogRoll();
+    Thread.sleep(5000);
+
+    assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+    walEntry.setValue(walEntry.getKey().getCurrentFileName());
+    while (it.hasNext()) {
+      walEntry = it.next();
+      assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+    }
+
+    // period roll
+    Thread.sleep(LOG_ROLL_PERIOD + 5000);
+    for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
+      assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
+      entry.getKey().close();
+    }
+  }
+}