You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2021/11/24 07:49:14 UTC
[hbase] branch branch-1 updated: HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)
This is an automated email from the ASF dual-hosted git repository.
haxiaolin pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 13787ae HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)
13787ae is described below
commit 13787ae03ccd408a24fba67be14b5c929874214a
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Nov 24 02:48:26 2021 -0500
HBASE-26480 Close NamedQueueRecorder to allow HMaster/RS to shutdown gracefully (#3871)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/namequeues/NamedQueueRecorder.java | 54 +++++++++++++++++-----
.../hadoop/hbase/regionserver/HRegionServer.java | 8 ++++
2 files changed, 51 insertions(+), 11 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
index 9101ce6..c9d1729 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
@@ -23,15 +23,22 @@ import com.google.common.base.Preconditions;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* NamedQueue recorder that maintains various named queues.
@@ -41,14 +48,16 @@ import org.apache.hadoop.hbase.util.Threads;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public final class NamedQueueRecorder {
-
+public final class NamedQueueRecorder implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(NamedQueueRecorder.class);
private final Disruptor<RingBufferEnvelope> disruptor;
private final LogEventHandler logEventHandler;
+ private final ExecutorService executorService;
private static NamedQueueRecorder namedQueueRecorder;
private static boolean isInit = false;
private static final Object LOCK = new Object();
+ private volatile boolean closed = false;
/**
* Initialize disruptor with configurable ringbuffer size
@@ -61,10 +70,10 @@ public final class NamedQueueRecorder {
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
+ this.executorService = Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory(
+ hostingThreadName + ".slowlog.append-pool"));
// disruptor initialization with BlockingWaitStrategy
- this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), Executors.
- newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName
- + ".slowlog.append-pool")),
+ this.disruptor = new Disruptor<>(getEventFactory(), getEventCount(eventCount), executorService,
ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.handleExceptionsWith(new DisruptorExceptionHandler());
@@ -142,12 +151,14 @@ public final class NamedQueueRecorder {
* service
*/
public void addRecord(NamedQueuePayload namedQueuePayload) {
- RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
- long seqId = ringBuffer.next();
- try {
- ringBuffer.get(seqId).load(namedQueuePayload);
- } finally {
- ringBuffer.publish(seqId);
+ if (!closed) {
+ RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
+ long seqId = ringBuffer.next();
+ try {
+ ringBuffer.get(seqId).load(namedQueuePayload);
+ } finally {
+ ringBuffer.publish(seqId);
+ }
}
}
@@ -161,4 +172,25 @@ public final class NamedQueueRecorder {
}
}
+ @Override
+ public void close() throws IOException {
+ // Setting closed flag to true so that we don't add more events to RingBuffer.
+ this.closed = true;
+ LOG.info("Closing NamedQueueRecorder");
+ if (this.disruptor != null) {
+ long timeoutms = 5000;
+ try {
+ this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.warn("Timed out bringing down disruptor after " + timeoutms + " ms; forcing halt", e);
+ this.disruptor.halt();
+ this.disruptor.shutdown();
+ }
+ }
+ // With disruptor down, this is safe to let go.
+ if (this.executorService != null) {
+ // This will close the executor threads.
+ this.executorService.shutdownNow();
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a5500a1..0a423d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1231,6 +1231,14 @@ public class HRegionServer extends HasThread implements
stopServiceThreads();
}
+ try {
+ if (this.namedQueueRecorder != null) {
+ namedQueueRecorder.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Attempt to close NamedQueueRecorder failed", ioe);
+ }
+
if (this.rpcServices != null) {
this.rpcServices.stop();
}