You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2020/08/01 17:16:53 UTC
[hbase] branch branch-2 updated: HBASE-24695 FSHLog - close the
current WAL file in a background thread. (#2183)
This is an automated email from the ASF dual-hosted git repository.
anoopsamjohn pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 86fccba HBASE-24695 FSHLog - close the current WAL file in a background thread. (#2183)
86fccba is described below
commit 86fccba0d0c1671ebf71259a3eed3abc43359714
Author: Anoop Sam John <an...@gmail.com>
AuthorDate: Sat Aug 1 22:46:32 2020 +0530
HBASE-24695 FSHLog - close the current WAL file in a background thread. (#2183)
Signed-off-by: Ramkrishna <ra...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hadoop/hbase/regionserver/wal/FSHLog.java | 72 +++++++++++++++++-----
1 file changed, 56 insertions(+), 16 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 902d354..168cbed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -35,6 +35,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -62,6 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The default implementation of FSWAL.
@@ -115,6 +118,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
private static final int DEFAULT_MAX_BATCH_COUNT = 200;
+ private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.fshlog.wait.on.shutdown.seconds";
+ private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
+
/**
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends
* and syncs are each put on the ring which means handlers need to smash up against the ring twice
@@ -160,6 +166,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final AtomicInteger closeErrorCount = new AtomicInteger();
+ private final int waitOnShutdownInSeconds;
+ private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
+
/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
* using our logger instead of java native logger.
@@ -224,7 +234,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
CommonFSUtils.getDefaultReplication(fs, this.walDir));
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
-
+ this.waitOnShutdownInSeconds = conf.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS,
+ DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
@@ -355,23 +366,22 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
- // TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) {
oldFileLen = this.writer.getLength();
- try {
- TraceUtil.addTimelineAnnotation("closing writer");
- this.writer.close();
- TraceUtil.addTimelineAnnotation("writer closed");
- this.closeErrorCount.set(0);
- } catch (IOException ioe) {
- int errors = closeErrorCount.incrementAndGet();
- if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
- LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage()
- + "\", errors=" + errors
- + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
- } else {
- throw ioe;
- }
+ // In case of having unflushed entries or we already reached the
+ // closeErrorsTolerated count, call the closeWriter inline rather than in async
+ // way so that in case of an IOE we will throw it back and abort RS.
+ if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
+ closeWriter(this.writer, oldPath, true);
+ } else {
+ Writer localWriter = this.writer;
+ closeExecutor.execute(() -> {
+ try {
+ closeWriter(localWriter, oldPath, false);
+ } catch (IOException e) {
+ // We will never reach here.
+ }
+ });
}
}
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
@@ -413,6 +423,24 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
+ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws IOException {
+ try {
+ TraceUtil.addTimelineAnnotation("closing writer");
+ writer.close();
+ TraceUtil.addTimelineAnnotation("writer closed");
+ } catch (IOException ioe) {
+ int errors = closeErrorCount.incrementAndGet();
+ boolean hasUnflushedEntries = isUnflushedEntries();
+ if (syncCloseCall && (hasUnflushedEntries || (errors > this.closeErrorsTolerated))) {
+ LOG.error("Close of WAL " + path + " failed. Cause=\"" + ioe.getMessage() + "\", errors="
+ + errors + ", hasUnflushedEntries=" + hasUnflushedEntries);
+ throw ioe;
+ }
+ LOG.warn("Riding over failed WAL close of " + path
+ + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
+ }
+ }
+
@Override
protected void doShutdown() throws IOException {
// Shutdown the disruptor. Will stop after all entries have been processed. Make sure we
@@ -437,6 +465,18 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this.writer.close();
this.writer = null;
}
+ closeExecutor.shutdown();
+ try {
+ if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
+ LOG.error("We have waited {} seconds but the close of writer(s) doesn't complete."
+ + "Please check the status of underlying filesystem"
+ + " or increase the wait time by the config \"{}\"", this.waitOnShutdownInSeconds,
+ FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("The wait for termination of FSHLog writer(s) is interrupted");
+ Thread.currentThread().interrupt();
+ }
}
@Override