You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2020/10/16 12:42:43 UTC
[hbase] branch branch-1 updated: HBASE-24849 Branch-1 Backport :
HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs
a roll (#2194)
This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new e066951 HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)
e066951 is described below
commit e06695112a358706344cc8682f2713b43daab340
Author: WenFeiYi <we...@gmail.com>
AuthorDate: Fri Oct 16 20:42:18 2020 +0800
HBASE-24849 Branch-1 Backport : HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll (#2194)
Signed-off-by: Reid Chan <re...@apache.org>
---
.../hadoop/hbase/regionserver/LogRoller.java | 108 ++++++++++++++-----
.../hadoop/hbase/regionserver/TestLogRoller.java | 114 +++++++++++++++++++++
2 files changed, 194 insertions(+), 28 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index fd208c2..08d5a33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
@@ -56,23 +57,27 @@ public class LogRoller extends HasThread {
private static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final AtomicBoolean rollLog = new AtomicBoolean(false);
- private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
- new ConcurrentHashMap<WAL, Boolean>();
+ private final ConcurrentHashMap<WAL, RollController> wals =
+ new ConcurrentHashMap<WAL, RollController>();
private final Server server;
protected final RegionServerServices services;
- private volatile long lastrolltime = System.currentTimeMillis();
// Period to roll log.
- private final long rollperiod;
+ private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
- private long checkLowReplicationInterval;
+ private final long checkLowReplicationInterval;
public void addWAL(final WAL wal) {
- if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
+ if (null == wals.putIfAbsent(wal, new RollController(wal))) {
wal.registerWALActionsListener(new WALActionsListener.Base() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
- walNeedsRoll.put(wal, Boolean.TRUE);
+ RollController controller = wals.get(wal);
+ if (controller == null) {
+ wals.putIfAbsent(wal, new RollController(wal));
+ controller = wals.get(wal);
+ }
+ controller.requestRoll();
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized(rollLog) {
rollLog.set(true);
@@ -84,8 +89,8 @@ public class LogRoller extends HasThread {
}
public void requestRollAll() {
- for (WAL wal : walNeedsRoll.keySet()) {
- walNeedsRoll.put(wal, Boolean.TRUE);
+ for (RollController controller : wals.values()) {
+ controller.requestRoll();
}
synchronized(rollLog) {
rollLog.set(true);
@@ -98,7 +103,7 @@ public class LogRoller extends HasThread {
super("LogRoller");
this.server = server;
this.services = services;
- this.rollperiod = this.server.getConfiguration().
+ this.rollPeriod = this.server.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -120,9 +125,9 @@ public class LogRoller extends HasThread {
*/
void checkLowReplication(long now) {
try {
- for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+ for (Entry<WAL, RollController> entry : wals.entrySet()) {
WAL wal = entry.getKey();
- boolean neeRollAlready = entry.getValue();
+ boolean neeRollAlready = entry.getValue().needsRoll(now);
if(wal instanceof FSHLog && !neeRollAlready) {
FSHLog hlog = (FSHLog)wal;
if ((now - hlog.getLastTimeCheckLowReplication())
@@ -139,11 +144,16 @@ public class LogRoller extends HasThread {
@Override
public void run() {
while (!server.isStopped()) {
- long now = System.currentTimeMillis();
+ long now = EnvironmentEdgeManager.currentTime();
checkLowReplication(now);
- boolean periodic = false;
if (!rollLog.get()) {
- periodic = (now - this.lastrolltime) > this.rollperiod;
+ boolean periodic = false;
+ for (RollController controller : wals.values()) {
+ if (controller.needsPeriodicRoll(now)) {
+ periodic = true;
+ break;
+ }
+ }
if (!periodic) {
synchronized (rollLog) {
try {
@@ -156,23 +166,24 @@ public class LogRoller extends HasThread {
}
continue;
}
- // Time for periodic roll
- if (LOG.isDebugEnabled()) {
- LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
- }
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("WAL roll requested");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
- this.lastrolltime = now;
- for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+ for (Entry<WAL, RollController> entry : wals.entrySet()) {
final WAL wal = entry.getKey();
+ RollController controller = entry.getValue();
+ if (controller.isRollRequested()) {
+ // WAL roll requested, fall through
+ LOG.debug("WAL " + wal + " roll requested");
+ } else if (controller.needsPeriodicRoll(now)) {
+ // Time for periodic roll, fall through
+ LOG.debug("WAL " + wal + " roll period " + this.rollPeriod + "ms elapsed");
+ } else {
+ continue;
+ }
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
- final byte [][] regionsToFlush = wal.rollWriter(periodic ||
- entry.getValue().booleanValue());
- walNeedsRoll.put(wal, Boolean.FALSE);
+ final byte [][] regionsToFlush = controller.rollWal(now);
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
@@ -229,11 +240,52 @@ public class LogRoller extends HasThread {
*/
@VisibleForTesting
public boolean walRollFinished() {
- for (boolean needRoll : walNeedsRoll.values()) {
- if (needRoll) {
+ long now = EnvironmentEdgeManager.currentTime();
+ for (RollController controller : wals.values()) {
+ if (controller.needsRoll(now)) {
return false;
}
}
return true;
}
+
+
+ /**
+ * Independently control the roll of each wal. When use multiwal,
+ * can avoid all wal roll together. see HBASE-24665 for detail
+ */
+ protected class RollController {
+ private final WAL wal;
+ private final AtomicBoolean rollRequest;
+ private long lastRollTime;
+
+ RollController(WAL wal) {
+ this.wal = wal;
+ this.rollRequest = new AtomicBoolean(false);
+ this.lastRollTime = EnvironmentEdgeManager.currentTime();
+ }
+
+ public void requestRoll() {
+ this.rollRequest.set(true);
+ }
+
+ public byte[][] rollWal(long now) throws IOException {
+ this.lastRollTime = now;
+ byte[][] regionsToFlush = wal.rollWriter(true);
+ this.rollRequest.set(false);
+ return regionsToFlush;
+ }
+
+ public boolean isRollRequested() {
+ return rollRequest.get();
+ }
+
+ public boolean needsPeriodicRoll(long now) {
+ return (now - this.lastRollTime) > rollPeriod;
+ }
+
+ public boolean needsRoll(long now) {
+ return isRollRequested() || needsPeriodicRoll(now);
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
new file mode 100644
index 0000000..fac6b99
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestLogRoller {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final int LOG_ROLL_PERIOD = 20 * 1000;
+ private static final String LOG_DIR = "WALs";
+ private static final String ARCHIVE_DIR = "archiveWALs";
+ private static final String WAL_PREFIX = "test-log-roller";
+ private static Configuration CONF;
+ private static LogRoller ROLLER;
+ private static Path ROOT_DIR;
+ private static FileSystem FS;
+
+ @Before
+ public void setup() throws Exception {
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+ CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+ ROOT_DIR = TEST_UTIL.getRandomDir();
+ FS = FileSystem.get(CONF);
+ HRegionServer server = Mockito.mock(HRegionServer.class);
+ Mockito.when(server.getConfiguration()).thenReturn(CONF);
+ RegionServerServices services = Mockito.mock(RegionServerServices.class);
+ ROLLER = new LogRoller(server, services);
+ ROLLER.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ROLLER.interrupt();
+ FS.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * verify that each wal roll separately
+ */
+ @Test
+ public void testRequestRollWithMultiWal() throws Exception {
+ // add multiple wal
+ Map<FSHLog, Path> wals = new HashMap<FSHLog, Path>();
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, "." + i);
+ wal.rollWriter(true);
+ wals.put(wal, wal.getCurrentFileName());
+ ROLLER.addWAL(wal);
+ Thread.sleep(1000);
+ }
+
+ // request roll
+ Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
+ Map.Entry<FSHLog, Path> walEntry = it.next();
+ walEntry.getKey().requestLogRoll();
+ Thread.sleep(5000);
+
+ assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+ walEntry.setValue(walEntry.getKey().getCurrentFileName());
+ while (it.hasNext()) {
+ walEntry = it.next();
+ assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+ }
+
+ // period roll
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+ for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
+ assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
+ entry.getKey().close();
+ }
+ }
+}