You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2021/11/24 07:49:14 UTC

[hbase] branch branch-1 updated: HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 13787ae  HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)
13787ae is described below

commit 13787ae03ccd408a24fba67be14b5c929874214a
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Nov 24 02:48:26 2021 -0500

    HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/namequeues/NamedQueueRecorder.java       | 54 +++++++++++++++++-----
 .../hadoop/hbase/regionserver/HRegionServer.java   |  8 ++++
 2 files changed, 51 insertions(+), 11 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
index 9101ce6..c9d1729 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
@@ -23,15 +23,22 @@ import com.google.common.base.Preconditions;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
 import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
 import org.apache.hadoop.hbase.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * NamedQueue recorder that maintains various named queues.
@@ -41,14 +48,16 @@ import org.apache.hadoop.hbase.util.Threads;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public final class NamedQueueRecorder {
-
+public final class NamedQueueRecorder implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(NamedQueueRecorder.class);
   private final Disruptor<RingBufferEnvelope> disruptor;
   private final LogEventHandler logEventHandler;
+  private final ExecutorService executorService;
 
   private static NamedQueueRecorder namedQueueRecorder;
   private static boolean isInit = false;
   private static final Object LOCK = new Object();
+  private volatile boolean closed = false;
 
   /**
    * Initialize disruptor with configurable ringbuffer size
@@ -61,10 +70,10 @@ public final class NamedQueueRecorder {
 
     int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
 
+    this.executorService = Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory(
+      hostingThreadName + ".slowlog.append-pool"));
     // disruptor initialization with BlockingWaitStrategy
-    this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), Executors.
-      newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName
-        + ".slowlog.append-pool")),
+    this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), executorService,
       ProducerType.MULTI, new BlockingWaitStrategy());
     this.disruptor.handleExceptionsWith(new DisruptorExceptionHandler());
 
@@ -142,12 +151,14 @@ public final class NamedQueueRecorder {
    *   service
    */
   public void addRecord(NamedQueuePayload namedQueuePayload) {
-    RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
-    long seqId = ringBuffer.next();
-    try {
-      ringBuffer.get(seqId).load(namedQueuePayload);
-    } finally {
-      ringBuffer.publish(seqId);
+    if (!closed) {
+      RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
+      long seqId = ringBuffer.next();
+      try {
+        ringBuffer.get(seqId).load(namedQueuePayload);
+      } finally {
+        ringBuffer.publish(seqId);
+      }
     }
   }
 
@@ -161,4 +172,25 @@ public final class NamedQueueRecorder {
     }
   }
 
+  @Override
+  public void close() throws IOException {
+    // Setting closed flag to true so that we don't add more events to RingBuffer.
+    this.closed = true;
+    LOG.info("Closing NamedQueueRecorder");
+    if (this.disruptor != null) {
+      long timeoutms = 5000;
+      try {
+        this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        LOG.warn("Timed out bringing down disruptor after " + timeoutms + " ms; forcing halt", e);
+        this.disruptor.halt();
+        this.disruptor.shutdown();
+      }
+    }
+    // With disruptor down, this is safe to let go.
+    if (this.executorService !=  null) {
+      // This will close the executor threads.
+      this.executorService.shutdownNow();
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a5500a1..0a423d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1231,6 +1231,14 @@ public class HRegionServer extends HasThread implements
       stopServiceThreads();
     }
 
+    try {
+      if (this.namedQueueRecorder != null) {
+        namedQueueRecorder.close();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Attempt to close NamedQueueRecorder failed", ioe);
+    }
+
     if (this.rpcServices != null) {
       this.rpcServices.stop();
     }