You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2020/07/28 04:37:56 UTC
[hbase] branch branch-2.3 updated: HBASE-24665 MultiWAL : Avoid
rolling of ALL WALs when one of the WAL needs a roll #2155
This is an automated email from the ASF dual-hosted git repository.
anoopsamjohn pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 7ef1aca HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll #2155
7ef1aca is described below
commit 7ef1aca01af370e506986aa1c3769e68eb8bebdf
Author: WenFeiYi <we...@gmail.com>
AuthorDate: Tue Jul 28 12:37:37 2020 +0800
HBASE-24665 MultiWAL : Avoid rolling of ALL WALs when one of the WAL needs a roll #2155
Co-authored-by: wen_yi <li...@immomo.com>
Signed-off-by: Anoop <an...@apache.org>
Signed-off-by: Ramkrishna <ra...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../hadoop/hbase/regionserver/LogRoller.java | 4 +-
.../apache/hadoop/hbase/wal/AbstractWALRoller.java | 119 +++++++++++++--------
.../hadoop/hbase/regionserver/TestLogRoller.java | 117 ++++++++++++++++++++
3 files changed, 196 insertions(+), 44 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index f5049c9..992b117 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -63,7 +63,7 @@ public class LogRoller extends AbstractWALRoller<RegionServerServices> {
}
@VisibleForTesting
- Map<WAL, Boolean> getWalNeedsRoll() {
- return this.walNeedsRoll;
+ Map<WAL, RollController> getWalNeedsRoll() {
+ return this.wals;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index 3154c19..8aa8a63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
@@ -56,31 +55,31 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
- protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
+ protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
protected final T abortable;
- private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
- private long checkLowReplicationInterval;
+ private final long checkLowReplicationInterval;
private volatile boolean running = true;
public void addWAL(WAL wal) {
// check without lock first
- if (walNeedsRoll.containsKey(wal)) {
+ if (wals.containsKey(wal)) {
return;
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
- if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
+ if (wals.putIfAbsent(wal, new RollController(wal)) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized (AbstractWALRoller.this) {
- walNeedsRoll.put(wal, Boolean.TRUE);
+ RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal));
+ controller.requestRoll();
AbstractWALRoller.this.notifyAll();
}
}
@@ -91,9 +90,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
public void requestRollAll() {
synchronized (this) {
- List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
- for (WAL wal : wals) {
- walNeedsRoll.put(wal, Boolean.TRUE);
+ for (RollController controller : wals.values()) {
+ controller.requestRoll();
}
notifyAll();
}
@@ -113,9 +111,9 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
*/
private void checkLowReplication(long now) {
try {
- for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
+ for (Entry<WAL, RollController> entry : wals.entrySet()) {
WAL wal = entry.getKey();
- boolean needRollAlready = entry.getValue();
+ boolean needRollAlready = entry.getValue().needsRoll(now);
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
@@ -131,7 +129,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
// is already broken.
- for (WAL wal : walNeedsRoll.keySet()) {
+ for (WAL wal : wals.keySet()) {
// shutdown rather than close here since we are going to abort the RS and the wals need to be
// split when recovery
try {
@@ -146,42 +144,39 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
@Override
public void run() {
while (running) {
- boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
- periodic = (now - this.lastRollTime) > this.rollPeriod;
- if (periodic) {
- // Time for periodic roll, fall through
- LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
- } else {
- synchronized (this) {
- if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
- // WAL roll requested, fall through
- LOG.debug("WAL roll requested");
- } else {
- try {
- wait(this.threadWakeFrequency);
- } catch (InterruptedException e) {
- // restore the interrupt state
- Thread.currentThread().interrupt();
- }
- // goto the beginning to check whether again whether we should fall through to roll
- // several WALs, and also check whether we should quit.
- continue;
+ synchronized (this) {
+ if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) {
+ try {
+ wait(this.threadWakeFrequency);
+ } catch (InterruptedException e) {
+ // restore the interrupt state
+ Thread.currentThread().interrupt();
}
+ // goto the beginning to check whether again whether we should fall through to roll
+ // several WALs, and also check whether we should quit.
+ continue;
}
}
try {
- this.lastRollTime = System.currentTimeMillis();
- for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
- .hasNext();) {
- Entry<WAL, Boolean> entry = iter.next();
+ for (Iterator<Entry<WAL, RollController>> iter = wals.entrySet().iterator();
+ iter.hasNext();) {
+ Entry<WAL, RollController> entry = iter.next();
WAL wal = entry.getKey();
- // reset the flag in front to avoid missing roll request before we return from rollWriter.
- walNeedsRoll.put(wal, Boolean.FALSE);
+ RollController controller = entry.getValue();
+ if (controller.isRollRequested()) {
+ // WAL roll requested, fall through
+ LOG.debug("WAL {} roll requested", wal);
+ } else if (controller.needsPeriodicRoll(now)){
+ // Time for periodic roll, fall through
+ LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod);
+ } else {
+ continue;
+ }
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
- byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
+ byte[][] regionsToFlush = controller.rollWal(now);
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(Bytes.toString(r));
@@ -223,7 +218,8 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
- return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
+ return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis()))
+ && isWaiting();
}
/**
@@ -240,4 +236,43 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread
running = false;
interrupt();
}
+
+ /**
+ * Independently control the roll of each wal. When use multiwal,
+ * can avoid all wal roll together. see HBASE-24665 for detail
+ */
+ protected class RollController {
+ private final WAL wal;
+ private final AtomicBoolean rollRequest;
+ private long lastRollTime;
+
+ RollController(WAL wal) {
+ this.wal = wal;
+ this.rollRequest = new AtomicBoolean(false);
+ this.lastRollTime = System.currentTimeMillis();
+ }
+
+ public void requestRoll() {
+ this.rollRequest.set(true);
+ }
+
+ public byte[][] rollWal(long now) throws IOException {
+ this.lastRollTime = now;
+ // reset the flag in front to avoid missing roll request before we return from rollWriter.
+ this.rollRequest.set(false);
+ return wal.rollWriter(true);
+ }
+
+ public boolean isRollRequested() {
+ return rollRequest.get();
+ }
+
+ public boolean needsPeriodicRoll(long now) {
+ return (now - this.lastRollTime) > rollPeriod;
+ }
+
+ public boolean needsRoll(long now) {
+ return isRollRequested() || needsPeriodicRoll(now);
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
new file mode 100644
index 0000000..86b9bc3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestLogRoller {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLogRoller.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static final int LOG_ROLL_PERIOD = 20 * 1000;
+ private static final String LOG_DIR = "WALs";
+ private static final String ARCHIVE_DIR = "archiveWALs";
+ private static final String WAL_PREFIX = "test-log-roller";
+ private static Configuration CONF;
+ private static LogRoller ROLLER;
+ private static Path ROOT_DIR;
+ private static FileSystem FS;
+
+ @Before
+ public void setup() throws Exception {
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD);
+ CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300);
+ ROOT_DIR = TEST_UTIL.getRandomDir();
+ FS = FileSystem.get(CONF);
+ RegionServerServices services = Mockito.mock(RegionServerServices.class);
+ Mockito.when(services.getConfiguration()).thenReturn(CONF);
+ ROLLER = new LogRoller(services);
+ ROLLER.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ROLLER.close();
+ FS.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * verify that each wal roll separately
+ */
+ @Test
+ public void testRequestRollWithMultiWal() throws Exception {
+ // add multiple wal
+ Map<FSHLog, Path> wals = new HashMap<>();
+ for (int i = 1; i <= 3; i++) {
+ FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null,
+ true, WAL_PREFIX, "." + i);
+ wal.init();
+ wals.put(wal, wal.getCurrentFileName());
+ ROLLER.addWAL(wal);
+ Thread.sleep(1000);
+ }
+
+ // request roll
+ Iterator<Map.Entry<FSHLog, Path>> it = wals.entrySet().iterator();
+ Map.Entry<FSHLog, Path> walEntry = it.next();
+ walEntry.getKey().requestLogRoll();
+ Thread.sleep(5000);
+
+ assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+ walEntry.setValue(walEntry.getKey().getCurrentFileName());
+ while (it.hasNext()) {
+ walEntry = it.next();
+ assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName());
+ }
+
+ // period roll
+ Thread.sleep(LOG_ROLL_PERIOD + 5000);
+ for (Map.Entry<FSHLog, Path> entry : wals.entrySet()) {
+ assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName());
+ entry.getKey().close();
+ }
+ }
+}