You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/06/06 14:17:45 UTC
[hbase] branch branch-2 updated: HBASE-27855 Support dynamic adjustment of flusher count (#5247)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 854ecd7c8cc HBASE-27855 Support dynamic adjustment of flusher count (#5247)
854ecd7c8cc is described below
commit 854ecd7c8ccf71336a8a03437bab1b579292517b
Author: Ruanhui <32...@users.noreply.github.com>
AuthorDate: Tue Jun 6 22:00:50 2023 +0800
HBASE-27855 Support dynamic adjustment of flusher count (#5247)
Co-authored-by: huiruan <87...@qq.com>
Signed-off-by: Duo Zhang <zh...@apache.org>
(cherry picked from commit 40976b0969d549ed6651c8bbc746ed7da33efd01)
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
---
.../hadoop/hbase/regionserver/HRegionServer.java | 3 +-
.../hadoop/hbase/regionserver/MemStoreFlusher.java | 148 ++++++++++++++++-----
.../hbase/regionserver/TestMemStoreFlusher.java | 22 +++
3 files changed, 139 insertions(+), 34 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2e36d2efe4e..2e8270ac175 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2379,6 +2379,7 @@ public class HRegionServer extends Thread
}
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
+ configurationManager.registerObserver(this.cacheFlusher);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
}
@@ -2865,7 +2866,7 @@ public class HRegionServer extends Thread
bootstrapNodeManager.stop();
}
if (this.cacheFlusher != null) {
- this.cacheFlusher.join();
+ this.cacheFlusher.shutdown();
}
if (this.walRoller != null) {
this.walRoller.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 23da77759a5..2c9b310061f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -22,6 +22,7 @@ import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
@@ -35,12 +36,14 @@ import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,7 +65,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* @see FlushRequester
*/
@InterfaceAudience.Private
-public class MemStoreFlusher implements FlushRequester {
+public class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
private Configuration conf;
@@ -80,7 +83,12 @@ public class MemStoreFlusher implements FlushRequester {
private long blockingWaitTime;
private final LongAdder updatesBlockedMsHighWater = new LongAdder();
- private final FlushHandler[] flushHandlers;
+ private FlushHandler[] flushHandlers;
+
+ private final AtomicInteger flusherIdGen = new AtomicInteger();
+
+ private ThreadFactory flusherThreadFactory;
+
private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
/**
@@ -116,14 +124,9 @@ public class MemStoreFlusher implements FlushRequester {
this.server = server;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
- int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
+ int handlerCount = 0;
if (server != null) {
- if (handlerCount < 1) {
- LOG.warn(
- "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
- handlerCount);
- handlerCount = 1;
- }
+ handlerCount = getHandlerCount(conf);
LOG.info("globalMemStoreLimit="
+ TraditionalBinaryPrefix
.long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
@@ -304,13 +307,15 @@ public class MemStoreFlusher implements FlushRequester {
private class FlushHandler extends Thread {
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
private FlushHandler(String name) {
super(name);
}
@Override
public void run() {
- while (!server.isStopped()) {
+ while (!server.isStopped() && running.get()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
@@ -355,15 +360,24 @@ public class MemStoreFlusher implements FlushRequester {
}
}
}
- synchronized (regionsInQueue) {
- regionsInQueue.clear();
- flushQueue.clear();
- }
- // Signal anyone waiting, so they see the close flag
- wakeUpIfBlocking();
+ if (server.isStopped()) {
+ synchronized (regionsInQueue) {
+ regionsInQueue.clear();
+ flushQueue.clear();
+ }
+
+ // Signal anyone waiting, so they see the close flag
+ wakeUpIfBlocking();
+ }
LOG.info(getName() + " exiting");
}
+
+ public void shutdown() {
+ if (!running.compareAndSet(true, false)) {
+ LOG.warn("{} is already signaled to shutdown", getName());
+ }
+ }
}
private void wakeupFlushThread() {
@@ -493,8 +507,10 @@ public class MemStoreFlusher implements FlushRequester {
void interruptIfNecessary() {
lock.writeLock().lock();
try {
- for (FlushHandler flushHander : flushHandlers) {
- if (flushHander != null) flushHander.interrupt();
+ for (FlushHandler flushHandler : flushHandlers) {
+ if (flushHandler != null) {
+ flushHandler.interrupt();
+ }
}
} finally {
lock.writeLock().unlock();
@@ -502,30 +518,40 @@ public class MemStoreFlusher implements FlushRequester {
}
synchronized void start(UncaughtExceptionHandler eh) {
- ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
- .setDaemon(true).setUncaughtExceptionHandler(eh).build();
- for (int i = 0; i < flushHandlers.length; i++) {
- flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
- flusherThreadFactory.newThread(flushHandlers[i]);
- flushHandlers[i].start();
+ this.flusherThreadFactory =
+ new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build();
+ lock.readLock().lock();
+ try {
+ startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length);
+ } finally {
+ lock.readLock().unlock();
}
}
boolean isAlive() {
- for (FlushHandler flushHander : flushHandlers) {
- if (flushHander != null && flushHander.isAlive()) {
- return true;
+ lock.readLock().lock();
+ try {
+ for (FlushHandler flushHandler : flushHandlers) {
+ if (flushHandler != null && flushHandler.isAlive()) {
+ return true;
+ }
}
+ return false;
+ } finally {
+ lock.readLock().unlock();
}
- return false;
}
- void join() {
- for (FlushHandler flushHander : flushHandlers) {
- if (flushHander != null) {
- Threads.shutdown(flushHander);
+ void shutdown() {
+ lock.readLock().lock();
+ try {
+ for (FlushHandler flushHandler : flushHandlers) {
+ if (flushHandler != null) {
+ Threads.shutdown(flushHandler);
+ }
}
+ } finally {
+ lock.readLock().unlock();
}
}
@@ -920,4 +946,60 @@ public class MemStoreFlusher implements FlushRequester {
return compareTo(other) == 0;
}
}
+
+ private int getHandlerCount(Configuration conf) {
+ int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
+ if (handlerCount < 1) {
+ LOG.warn(
+ "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
+ handlerCount);
+ handlerCount = 1;
+ }
+ return handlerCount;
+ }
+
+ @Override
+ public void onConfigurationChange(Configuration newConf) {
+ int newHandlerCount = getHandlerCount(newConf);
+ if (newHandlerCount != flushHandlers.length) {
+ LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length,
+ newHandlerCount);
+ lock.writeLock().lock();
+ try {
+ FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount);
+ if (newHandlerCount > flushHandlers.length) {
+ startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length);
+ } else {
+ stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length);
+ }
+ flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length);
+ this.flushHandlers = newFlushHandlers;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
+
+ private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
+ if (flusherThreadFactory != null) {
+ for (int i = start; i < end; i++) {
+ flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement());
+ flusherThreadFactory.newThread(flushHandlers[i]);
+ flushHandlers[i].start();
+ }
+ }
+ }
+
+ private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
+ for (int i = start; i < end; i++) {
+ flushHandlers[i].shutdown();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send shutdown signal to {}", flushHandlers[i].getName());
+ }
+ }
+ }
+
+ public int getFlusherCount() {
+ return flusherIdGen.get();
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
index 0bc7946e8a1..158dd91d9a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
@@ -94,4 +94,26 @@ public class TestMemStoreFlusher {
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
}
+
+ @Test
+ public void testChangeFlusherCount() {
+ Configuration conf = new Configuration();
+ conf.set("hbase.hstore.flusher.count", "0");
+ HRegionServer rs = mock(HRegionServer.class);
+ doReturn(false).when(rs).isStopped();
+ doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting();
+
+ msf = new MemStoreFlusher(conf, rs);
+ msf.start(Threads.LOGGING_EXCEPTION_HANDLER);
+
+ Configuration newConf = new Configuration();
+
+ newConf.set("hbase.hstore.flusher.count", "3");
+ msf.onConfigurationChange(newConf);
+ assertEquals(3, msf.getFlusherCount());
+
+ newConf.set("hbase.hstore.flusher.count", "0");
+ msf.onConfigurationChange(newConf);
+ assertEquals(1, msf.getFlusherCount());
+ }
}