You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/06/06 14:01:00 UTC

[hbase] branch master updated: HBASE-27855 Support dynamic adjustment of flusher count (#5247)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 40976b0969d HBASE-27855 Support dynamic adjustment of flusher count (#5247)
40976b0969d is described below

commit 40976b0969d549ed6651c8bbc746ed7da33efd01
Author: Ruanhui <32...@users.noreply.github.com>
AuthorDate: Tue Jun 6 22:00:50 2023 +0800

    HBASE-27855 Support dynamic adjustment of flusher count (#5247)
    
    Co-authored-by: huiruan <87...@qq.com>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/regionserver/HRegionServer.java   |   3 +-
 .../hadoop/hbase/regionserver/MemStoreFlusher.java | 148 ++++++++++++++++-----
 .../hbase/regionserver/TestMemStoreFlusher.java    |  22 +++
 3 files changed, 139 insertions(+), 34 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 1bdf6a225c6..1c5940ca9ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2074,6 +2074,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
     }
     // Registering the compactSplitThread object with the ConfigurationManager.
     configurationManager.registerObserver(this.compactSplitThread);
+    configurationManager.registerObserver(this.cacheFlusher);
     configurationManager.registerObserver(this.rpcServices);
     configurationManager.registerObserver(this);
   }
@@ -2454,7 +2455,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
       bootstrapNodeManager.stop();
     }
     if (this.cacheFlusher != null) {
-      this.cacheFlusher.join();
+      this.cacheFlusher.shutdown();
     }
     if (this.walRoller != null) {
       this.walRoller.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 44b19192542..301b97e9a50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -22,6 +22,7 @@ import io.opentelemetry.context.Scope;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -36,12 +37,14 @@ import java.util.concurrent.Delayed;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -63,7 +66,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * @see FlushRequester
  */
 @InterfaceAudience.Private
-public class MemStoreFlusher implements FlushRequester {
+public class MemStoreFlusher implements FlushRequester, ConfigurationObserver {
   private static final Logger LOG = LoggerFactory.getLogger(MemStoreFlusher.class);
 
   private Configuration conf;
@@ -81,7 +84,12 @@ public class MemStoreFlusher implements FlushRequester {
   private long blockingWaitTime;
   private final LongAdder updatesBlockedMsHighWater = new LongAdder();
 
-  private final FlushHandler[] flushHandlers;
+  private FlushHandler[] flushHandlers;
+
+  private final AtomicInteger flusherIdGen = new AtomicInteger();
+
+  private ThreadFactory flusherThreadFactory;
+
   private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
 
   /**
@@ -117,14 +125,9 @@ public class MemStoreFlusher implements FlushRequester {
     this.server = server;
     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
-    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
+    int handlerCount = 0;
     if (server != null) {
-      if (handlerCount < 1) {
-        LOG.warn(
-          "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
-          handlerCount);
-        handlerCount = 1;
-      }
+      handlerCount = getHandlerCount(conf);
       LOG.info("globalMemStoreLimit="
         + TraditionalBinaryPrefix
           .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
@@ -305,13 +308,15 @@ public class MemStoreFlusher implements FlushRequester {
 
   private class FlushHandler extends Thread {
 
+    private final AtomicBoolean running = new AtomicBoolean(true);
+
     private FlushHandler(String name) {
       super(name);
     }
 
     @Override
     public void run() {
-      while (!server.isStopped()) {
+      while (!server.isStopped() && running.get()) {
         FlushQueueEntry fqe = null;
         try {
           wakeupPending.set(false); // allow someone to wake us up again
@@ -356,15 +361,24 @@ public class MemStoreFlusher implements FlushRequester {
           }
         }
       }
-      synchronized (regionsInQueue) {
-        regionsInQueue.clear();
-        flushQueue.clear();
-      }
 
-      // Signal anyone waiting, so they see the close flag
-      wakeUpIfBlocking();
+      if (server.isStopped()) {
+        synchronized (regionsInQueue) {
+          regionsInQueue.clear();
+          flushQueue.clear();
+        }
+
+        // Signal anyone waiting, so they see the close flag
+        wakeUpIfBlocking();
+      }
       LOG.info(getName() + " exiting");
     }
+
+    public void shutdown() {
+      if (!running.compareAndSet(true, false)) {
+        LOG.warn("{} is already signaled to shutdown", getName());
+      }
+    }
   }
 
   private void wakeupFlushThread() {
@@ -497,8 +511,10 @@ public class MemStoreFlusher implements FlushRequester {
   void interruptIfNecessary() {
     lock.writeLock().lock();
     try {
-      for (FlushHandler flushHander : flushHandlers) {
-        if (flushHander != null) flushHander.interrupt();
+      for (FlushHandler flushHandler : flushHandlers) {
+        if (flushHandler != null) {
+          flushHandler.interrupt();
+        }
       }
     } finally {
       lock.writeLock().unlock();
@@ -506,30 +522,40 @@ public class MemStoreFlusher implements FlushRequester {
   }
 
   synchronized void start(UncaughtExceptionHandler eh) {
-    ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
-      .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
-      .setDaemon(true).setUncaughtExceptionHandler(eh).build();
-    for (int i = 0; i < flushHandlers.length; i++) {
-      flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
-      flusherThreadFactory.newThread(flushHandlers[i]);
-      flushHandlers[i].start();
+    this.flusherThreadFactory =
+      new ThreadFactoryBuilder().setDaemon(true).setUncaughtExceptionHandler(eh).build();
+    lock.readLock().lock();
+    try {
+      startFlushHandlerThreads(flushHandlers, 0, flushHandlers.length);
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
   boolean isAlive() {
-    for (FlushHandler flushHander : flushHandlers) {
-      if (flushHander != null && flushHander.isAlive()) {
-        return true;
+    lock.readLock().lock();
+    try {
+      for (FlushHandler flushHandler : flushHandlers) {
+        if (flushHandler != null && flushHandler.isAlive()) {
+          return true;
+        }
       }
+      return false;
+    } finally {
+      lock.readLock().unlock();
     }
-    return false;
   }
 
-  void join() {
-    for (FlushHandler flushHander : flushHandlers) {
-      if (flushHander != null) {
-        Threads.shutdown(flushHander);
+  void shutdown() {
+    lock.readLock().lock();
+    try {
+      for (FlushHandler flushHandler : flushHandlers) {
+        if (flushHandler != null) {
+          Threads.shutdown(flushHandler);
+        }
       }
+    } finally {
+      lock.readLock().unlock();
     }
   }
 
@@ -924,4 +950,60 @@ public class MemStoreFlusher implements FlushRequester {
       return compareTo(other) == 0;
     }
   }
+
+  private int getHandlerCount(Configuration conf) {
+    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
+    if (handlerCount < 1) {
+      LOG.warn(
+        "hbase.hstore.flusher.count was configed to {} which is less than 1, " + "corrected to 1",
+        handlerCount);
+      handlerCount = 1;
+    }
+    return handlerCount;
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    int newHandlerCount = getHandlerCount(newConf);
+    if (newHandlerCount != flushHandlers.length) {
+      LOG.info("update hbase.hstore.flusher.count from {} to {}", flushHandlers.length,
+        newHandlerCount);
+      lock.writeLock().lock();
+      try {
+        FlushHandler[] newFlushHandlers = Arrays.copyOf(flushHandlers, newHandlerCount);
+        if (newHandlerCount > flushHandlers.length) {
+          startFlushHandlerThreads(newFlushHandlers, flushHandlers.length, newFlushHandlers.length);
+        } else {
+          stopFlushHandlerThreads(flushHandlers, newHandlerCount, flushHandlers.length);
+        }
+        flusherIdGen.compareAndSet(flushHandlers.length, newFlushHandlers.length);
+        this.flushHandlers = newFlushHandlers;
+      } finally {
+        lock.writeLock().unlock();
+      }
+    }
+  }
+
+  private void startFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
+    if (flusherThreadFactory != null) {
+      for (int i = start; i < end; i++) {
+        flushHandlers[i] = new FlushHandler("MemStoreFlusher." + flusherIdGen.getAndIncrement());
+        flusherThreadFactory.newThread(flushHandlers[i]);
+        flushHandlers[i].start();
+      }
+    }
+  }
+
+  private void stopFlushHandlerThreads(FlushHandler[] flushHandlers, int start, int end) {
+    for (int i = start; i < end; i++) {
+      flushHandlers[i].shutdown();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("send shutdown signal to {}", flushHandlers[i].getName());
+      }
+    }
+  }
+
+  public int getFlusherCount() {
+    return flusherIdGen.get();
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
index 0bc7946e8a1..158dd91d9a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java
@@ -94,4 +94,26 @@ public class TestMemStoreFlusher {
     assertEquals(1, msf.getFlushQueueSize());
     assertTrue(msf.regionsInQueue.get(r).isDelay());
   }
+
+  @Test
+  public void testChangeFlusherCount() {
+    Configuration conf = new Configuration();
+    conf.set("hbase.hstore.flusher.count", "0");
+    HRegionServer rs = mock(HRegionServer.class);
+    doReturn(false).when(rs).isStopped();
+    doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting();
+
+    msf = new MemStoreFlusher(conf, rs);
+    msf.start(Threads.LOGGING_EXCEPTION_HANDLER);
+
+    Configuration newConf = new Configuration();
+
+    newConf.set("hbase.hstore.flusher.count", "3");
+    msf.onConfigurationChange(newConf);
+    assertEquals(3, msf.getFlusherCount());
+
+    newConf.set("hbase.hstore.flusher.count", "0");
+    msf.onConfigurationChange(newConf);
+    assertEquals(1, msf.getFlusherCount());
+  }
 }