You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/05/02 01:25:54 UTC
[hbase] branch master updated: HBASE-22301 Consider rolling the WAL
if the HDFS write pipeline is slow
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 47b4ab7 HBASE-22301 Consider rolling the WAL if the HDFS write pipeline is slow
47b4ab7 is described below
commit 47b4ab7b9732b790b2b471c489f670093e64ad2c
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Tue Apr 30 15:23:08 2019 -0700
HBASE-22301 Consider rolling the WAL if the HDFS write pipeline is slow
---
.../hbase/regionserver/wal/MetricsWALSource.java | 19 ++-
.../regionserver/wal/MetricsWALSourceImpl.java | 30 +++-
.../hadoop/hbase/regionserver/LogRoller.java | 2 +-
.../hbase/regionserver/wal/AbstractFSWAL.java | 127 +++++++++++---
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 22 +--
.../hadoop/hbase/regionserver/wal/FSHLog.java | 65 ++++---
.../hadoop/hbase/regionserver/wal/MetricsWAL.java | 19 ++-
.../hbase/regionserver/wal/WALActionsListener.java | 14 +-
.../hadoop/hbase/wal/DisabledWALProvider.java | 2 +-
.../hbase/regionserver/wal/TestLogRolling.java | 186 ++++++++++++++++++++-
.../hbase/regionserver/wal/TestMetricsWAL.java | 16 +-
11 files changed, 427 insertions(+), 75 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
index fbe95f5..7dc27f6 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java
@@ -60,10 +60,19 @@ public interface MetricsWALSource extends BaseSource {
String SYNC_TIME = "syncTime";
String SYNC_TIME_DESC = "The time it took to sync the WAL to HDFS.";
String ROLL_REQUESTED = "rollRequest";
- String ROLL_REQUESTED_DESC = "How many times a log roll has been requested total";
+ String ROLL_REQUESTED_DESC = "How many times a roll has been requested total";
+ String ERROR_ROLL_REQUESTED = "errorRollRequest";
+ String ERROR_ROLL_REQUESTED_DESC =
+ "How many times a roll was requested due to I/O or other errors.";
String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest";
String LOW_REPLICA_ROLL_REQUESTED_DESC =
- "How many times a log roll was requested due to too few DN's in the write pipeline.";
+ "How many times a roll was requested due to too few datanodes in the write pipeline.";
+ String SLOW_SYNC_ROLL_REQUESTED = "slowSyncRollRequest";
+ String SLOW_SYNC_ROLL_REQUESTED_DESC =
+ "How many times a roll was requested due to sync too slow on the write pipeline.";
+ String SIZE_ROLL_REQUESTED = "sizeRollRequest";
+ String SIZE_ROLL_REQUESTED_DESC =
+ "How many times a roll was requested due to file size roll threshold.";
String WRITTEN_BYTES = "writtenBytes";
String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL.";
@@ -94,10 +103,16 @@ public interface MetricsWALSource extends BaseSource {
void incrementLogRollRequested();
+ void incrementErrorLogRoll();
+
void incrementLowReplicationLogRoll();
long getSlowAppendCount();
+ void incrementSlowSyncLogRoll();
+
+ void incrementSizeLogRoll();
+
void incrementWrittenBytes(long val);
long getWrittenBytes();
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
index 2f35d4c..eb605c5 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java
@@ -38,7 +38,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
private final MutableFastCounter appendCount;
private final MutableFastCounter slowAppendCount;
private final MutableFastCounter logRollRequested;
- private final MutableFastCounter lowReplicationLogRollRequested;
+ private final MutableFastCounter errorRollRequested;
+ private final MutableFastCounter lowReplicationRollRequested;
+ private final MutableFastCounter slowSyncRollRequested;
+ private final MutableFastCounter sizeRollRequested;
private final MutableFastCounter writtenBytes;
public MetricsWALSourceImpl() {
@@ -60,8 +63,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
syncTimeHisto = this.getMetricsRegistry().newTimeHistogram(SYNC_TIME, SYNC_TIME_DESC);
logRollRequested =
this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L);
- lowReplicationLogRollRequested = this.getMetricsRegistry()
+ errorRollRequested = this.getMetricsRegistry()
+ .newCounter(ERROR_ROLL_REQUESTED, ERROR_ROLL_REQUESTED_DESC, 0L);
+ lowReplicationRollRequested = this.getMetricsRegistry()
.newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L);
+ slowSyncRollRequested = this.getMetricsRegistry()
+ .newCounter(SLOW_SYNC_ROLL_REQUESTED, SLOW_SYNC_ROLL_REQUESTED_DESC, 0L);
+ sizeRollRequested = this.getMetricsRegistry()
+ .newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L);
}
@@ -96,8 +105,23 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
}
@Override
+ public void incrementErrorLogRoll() {
+ errorRollRequested.incr();
+ }
+
+ @Override
public void incrementLowReplicationLogRoll() {
- lowReplicationLogRollRequested.incr();
+ lowReplicationRollRequested.incr();
+ }
+
+ @Override
+ public void incrementSlowSyncLogRoll() {
+ slowSyncRollRequested.incr();
+ }
+
+ @Override
+ public void incrementSizeLogRoll() {
+ sizeRollRequested.incr();
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 05a8fdf..8adb9b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -73,7 +73,7 @@ public class LogRoller extends HasThread implements Closeable {
if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
- public void logRollRequested(boolean lowReplicas) {
+ public void logRollRequested(WALActionsListener.RollRequestReason reason) {
walNeedsRoll.put(wal, Boolean.TRUE);
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized(rollLog) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 6c8cbfa..ad2eec6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@@ -116,14 +119,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
/** Don't log blocking regions more frequently than this. */
private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
- private static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.hlog.slowsync.ms";
+ protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms";
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
-
- private static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.hlog.roll.on.sync.ms";
+ protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
+ protected static final String SLOW_SYNC_ROLL_THRESHOLD =
+ "hbase.regionserver.wal.slowsync.roll.threshold";
+ protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
+ protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
+ "hbase.regionserver.wal.slowsync.roll.interval.ms";
+ protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
- private static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.hlog.sync.timeout";
- private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
+ protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
+ protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
/**
* file system instance
@@ -179,6 +187,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
/** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
protected final long slowSyncNs, rollOnSyncNs;
+ protected final int slowSyncRollThreshold;
+ protected final int slowSyncCheckInterval;
+ protected final AtomicInteger slowSyncCount = new AtomicInteger();
private final long walSyncTimeoutNs;
@@ -237,7 +248,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
volatile W writer;
// Last time to check low replication on hlog's pipeline
- private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
+ private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
+
+ // Last time we asked to roll the log due to a slow sync
+ private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
protected volatile boolean closed = false;
@@ -301,6 +315,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
protected final String implClassName;
+ protected volatile boolean rollRequested;
+
public long getFilenum() {
return this.filenum.get();
}
@@ -435,13 +451,16 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir);
- this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(
- conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));
- this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(
- conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
- this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
- conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));
-
+ this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
+ DEFAULT_SLOW_SYNC_TIME_MS));
+ this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS,
+ DEFAULT_ROLL_ON_SYNC_TIME_MS));
+ this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD,
+ DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
+ this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS,
+ DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
+ this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
+ DEFAULT_WAL_SYNC_TIMEOUT_MS));
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
@Override
protected SyncFuture initialValue() {
@@ -814,11 +833,16 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
tellListenersAboutPreLogRoll(oldPath, newPath);
// NewPath could be equal to oldPath if replaceWriter fails.
newPath = replaceWriter(oldPath, newPath, nextWriter);
+ // Reset rollRequested status
+ rollRequested = false;
tellListenersAboutPostLogRoll(oldPath, newPath);
if (LOG.isDebugEnabled()) {
LOG.debug("Create new " + implClassName + " writer with pipeline: " +
Arrays.toString(getPipeline()));
}
+ // We got a new writer, so reset the slow sync count
+ lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
+ slowSyncCount.set(0);
// Can we delete any of the old log files?
if (getNumRolledLogFiles() > 0) {
cleanOldLogs();
@@ -845,7 +869,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// public only until class moves to o.a.h.h.wal
public void requestLogRoll() {
- requestLogRoll(false);
+ requestLogRoll(ERROR);
}
/**
@@ -925,10 +949,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return cachedSyncFutures.get().reset(sequence);
}
- protected final void requestLogRoll(boolean tooFewReplicas) {
+ protected boolean isLogRollRequested() {
+ return rollRequested;
+ }
+
+ protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
+ // If we have already requested a roll, don't do it again
+ if (rollRequested) {
+ return;
+ }
if (!this.listeners.isEmpty()) {
+ rollRequested = true; // No point to assert this unless there is a registered listener
for (WALActionsListener i : this.listeners) {
- i.logRollRequested(tooFewReplicas);
+ i.logRollRequested(reason);
}
}
}
@@ -997,23 +1030,32 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return len;
}
- protected final boolean postSync(long timeInNanos, int handlerSyncs) {
+ protected final void postSync(long timeInNanos, int handlerSyncs) {
if (timeInNanos > this.slowSyncNs) {
- String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000)
- .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString();
+ String msg = new StringBuilder().append("Slow sync cost: ")
+ .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos))
+ .append(" ms, current pipeline: ")
+ .append(Arrays.toString(getPipeline())).toString();
TraceUtil.addTimelineAnnotation(msg);
LOG.info(msg);
+ if (timeInNanos > this.rollOnSyncNs) {
+ // A single sync took too long.
+ // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
+ // effects. Here we have a single data point that indicates we should take immediate
+ // action, so do so.
+ LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" +
+ TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" +
+ TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " +
+ Arrays.toString(getPipeline()));
+ requestLogRoll(SLOW_SYNC);
+ }
+ slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
}
if (!listeners.isEmpty()) {
for (WALActionsListener listener : listeners) {
listener.postSync(timeInNanos, handlerSyncs);
}
}
- if (timeInNanos > this.rollOnSyncNs) {
- LOG.info("Trying to request a roll due to a very long sync ({} ms)", timeInNanos / 1000000);
- return true;
- }
- return false;
}
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
@@ -1093,6 +1135,41 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract boolean doCheckLogLowReplication();
+ /**
+ * @return true if we exceeded the slow sync roll threshold over the last check
+ * interval
+ */
+ protected boolean doCheckSlowSync() {
+ boolean result = false;
+ long now = EnvironmentEdgeManager.currentTime();
+ long elapsedTime = now - lastTimeCheckSlowSync;
+ if (elapsedTime >= slowSyncCheckInterval) {
+ if (slowSyncCount.get() >= slowSyncRollThreshold) {
+ if (elapsedTime >= (2 * slowSyncCheckInterval)) {
+ // If two or more slowSyncCheckInterval have elapsed this is a corner case
+ // where a train of slow syncs almost triggered us but then there was a long
+ // interval from then until the one more that pushed us over. If so, we
+ // should do nothing and let the count reset.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("checkSlowSync triggered but we decided to ignore it; " +
+ "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
+ ", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" +
+ slowSyncCheckInterval + " ms");
+ }
+ // Fall through to count reset below
+ } else {
+ LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" +
+ slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold +
+ ", current pipeline: " + Arrays.toString(getPipeline()));
+ result = true;
+ }
+ }
+ lastTimeCheckSlowSync = now;
+ slowSyncCount.set(0);
+ }
+ return result;
+ }
+
public void checkLogLowReplication(long checkInterval) {
long now = EnvironmentEdgeManager.currentTime();
if (now - lastTimeCheckLowReplication < checkInterval) {
@@ -1105,7 +1182,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
try {
lastTimeCheckLowReplication = now;
if (doCheckLogLowReplication()) {
- requestLogRoll(true);
+ requestLogRoll(LOW_REPLICATION);
}
} finally {
rollWriterLock.unlock();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 553ff3d..209ace6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.lmax.disruptor.RingBuffer;
@@ -167,9 +169,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// notice that, modification to this field is only allowed under the protection of consumeLock.
private volatile int epochAndState;
- // used to guard the log roll request when we exceed the log roll size.
- private boolean rollRequested;
-
private boolean readyForRolling;
private final Condition readyForRollingCond = consumeLock.newCondition();
@@ -317,7 +316,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
highestUnsyncedTxid = highestSyncedTxid.get();
if (shouldRequestLogRoll) {
// request a roll.
- requestLogRoll();
+ requestLogRoll(ERROR);
}
}
@@ -330,18 +329,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
break;
}
}
-
- boolean doRequestRoll = postSync(System.nanoTime() - startTimeNs, finishSync(true));
+ postSync(System.nanoTime() - startTimeNs, finishSync(true));
if (trySetReadyForRolling()) {
// we have just finished a roll, then do not need to check for log rolling, the writer will be
// closed soon.
return;
}
- if ((!doRequestRoll && writer.getLength() < logrollsize) || rollRequested) {
- return;
+ // If we haven't already requested a roll, check if we have exceeded logrollsize
+ if (!isLogRollRequested() && writer.getLength() > logrollsize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requesting log roll because of file size threshold; length=" +
+ writer.getLength() + ", logrollsize=" + logrollsize);
+ }
+ requestLogRoll(SIZE);
}
- rollRequested = true;
- requestLogRoll();
}
private void sync(AsyncWriter writer) {
@@ -716,7 +717,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
}
this.fileLengthAtLastSync = nextWriter.getLength();
- this.rollRequested = false;
this.highestProcessedAppendTxidAtLastSync = 0L;
consumeLock.lock();
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 68cd338..44e919b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
+
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
@@ -257,10 +262,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
long startTimeNanos = System.nanoTime();
try {
nextWriter.sync(useHsync);
- boolean doRequestRoll = postSync(System.nanoTime() - startTimeNanos, 0);
- if (doRequestRoll) {
- LOG.info("Ignoring a roll request after a sync for a new file");
- }
+ postSync(System.nanoTime() - startTimeNanos, 0);
} catch (IOException e) {
// optimization failed, no need to abort here.
LOG.warn("pre-sync failed but an optimization so keep going", e);
@@ -600,16 +602,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Can we release other syncs?
syncCount += releaseSyncFutures(currentSequence, lastException);
if (lastException != null) {
- wasRollRequested = true;
- requestLogRoll();
+ requestLogRoll(ERROR);
} else {
- wasRollRequested = checkLogRoll();
+ checkLogRoll();
}
}
- boolean doRequestRoll = postSync(System.nanoTime() - start, syncCount);
- if (!wasRollRequested && doRequestRoll) {
- requestLogRoll();
- }
+ postSync(System.nanoTime() - start, syncCount);
} catch (InterruptedException e) {
// Presume legit interrupt.
Thread.currentThread().interrupt();
@@ -624,20 +622,35 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* Schedule a log roll if needed.
*/
private boolean checkLogRoll() {
+ // If we have already requested a roll, do nothing
+ if (isLogRollRequested()) {
+ return false;
+ }
// Will return immediately if we are in the middle of a WAL log roll currently.
if (!rollWriterLock.tryLock()) {
return false;
}
- boolean lowReplication;
try {
- lowReplication = doCheckLogLowReplication();
+ if (doCheckLogLowReplication()) {
+ LOG.warn("Requesting log roll because of low replication, current pipeline: " +
+ Arrays.toString(getPipeline()));
+ requestLogRoll(LOW_REPLICATION);
+ return true;
+ } else if (writer != null && writer.getLength() > logrollsize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requesting log roll because of file size threshold; length=" +
+ writer.getLength() + ", logrollsize=" + logrollsize);
+ }
+ requestLogRoll(SIZE);
+ return true;
+ } else if (doCheckSlowSync()) {
+ // We log this already in checkSlowSync
+ requestLogRoll(SLOW_SYNC);
+ return true;
+ }
} finally {
rollWriterLock.unlock();
}
- if (lowReplication || (writer != null && writer.getLength() > logrollsize)) {
- requestLogRoll(lowReplication);
- return true;
- }
return false;
}
@@ -774,8 +787,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
public static final long FIXED_OVERHEAD = ClassSize
- .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER
- + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
+ .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + (2 * ClassSize.ATOMIC_INTEGER)
+ + (3 * Bytes.SIZEOF_INT) + (4 * Bytes.SIZEOF_LONG));
/**
* This class is used coordinating two threads holding one thread at a 'safe point' while the
@@ -1023,7 +1036,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this.syncFuturesCount.get());
} catch (Exception e) {
// Should NEVER get here.
- requestLogRoll();
+ requestLogRoll(ERROR);
this.exception = new DamagedWALException("Failed offering sync", e);
}
}
@@ -1090,7 +1103,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
String msg = "Append sequenceId=" + entry.getKey().getSequenceId()
+ ", requesting roll of WAL";
LOG.warn(msg, e);
- requestLogRoll();
+ requestLogRoll(ERROR);
throw new DamagedWALException(msg, e);
}
}
@@ -1122,4 +1135,14 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
return new DatanodeInfo[0];
}
+
+ @VisibleForTesting
+ Writer getWriter() {
+ return this.writer;
+ }
+
+ @VisibleForTesting
+ void setWriter(Writer writer) {
+ this.writer = writer;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
index 900e55f..b2af4a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java
@@ -73,10 +73,23 @@ public class MetricsWAL implements WALActionsListener {
}
@Override
- public void logRollRequested(boolean underReplicated) {
+ public void logRollRequested(WALActionsListener.RollRequestReason reason) {
source.incrementLogRollRequested();
- if (underReplicated) {
- source.incrementLowReplicationLogRoll();
+ switch (reason) {
+ case ERROR:
+ source.incrementErrorLogRoll();
+ break;
+ case LOW_REPLICATION:
+ source.incrementLowReplicationLogRoll();
+ break;
+ case SIZE:
+ source.incrementSizeLogRoll();
+ break;
+ case SLOW_SYNC:
+ source.incrementSlowSyncLogRoll();
+ break;
+ default:
+ break;
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 13ffac7..7fba7df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -32,6 +32,18 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface WALActionsListener {
+ /** The reason for the log roll request. */
+ static enum RollRequestReason {
+ /** The length of the log exceeds the roll size threshold. */
+ SIZE,
+ /** Too few replicas in the writer pipeline. */
+ LOW_REPLICATION,
+ /** Too much time spent waiting for sync. */
+ SLOW_SYNC,
+ /** I/O or other error. */
+ ERROR
+ };
+
/**
* The WAL is going to be rolled. The oldPath can be null if this is
* the first log file from the regionserver.
@@ -65,7 +77,7 @@ public interface WALActionsListener {
/**
* A request was made that the WAL be rolled.
*/
- default void logRollRequested(boolean tooFewReplicas) {}
+ default void logRollRequested(RollRequestReason reason) {}
/**
* The WAL is about to close.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 75439fe..5f787fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -119,7 +119,7 @@ class DisabledWALProvider implements WALProvider {
public byte[][] rollWriter() {
if (!listeners.isEmpty()) {
for (WALActionsListener listener : listeners) {
- listener.logRollRequested(false);
+ listener.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
}
for (WALActionsListener listener : listeners) {
try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index e19361e..6c257e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -53,7 +55,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.BeforeClass;
@@ -91,6 +95,178 @@ public class TestLogRolling extends AbstractTestLogRolling {
conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
conf.set(WALFactory.WAL_PROVIDER, "filesystem");
AbstractTestLogRolling.setUpBeforeClass();
+
+ // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
+ TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
+ TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
+ // For slow sync threshold test: roll once after a sync above this threshold
+ TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
+ }
+
+ @Test
+ public void testSlowSyncLogRolling() throws Exception {
+ // Create the test table
+ TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
+ admin.createTable(desc);
+ Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
+ int row = 1;
+ try {
+ // Get a reference to the FSHLog
+ server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
+ RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
+ final FSHLog log = (FSHLog) server.getWAL(region);
+
+ // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested
+
+ final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
+ log.registerWALActionsListener(new WALActionsListener() {
+ @Override
+ public void logRollRequested(WALActionsListener.RollRequestReason reason) {
+ switch (reason) {
+ case SLOW_SYNC:
+ slowSyncHookCalled.lazySet(true);
+ break;
+ default:
+ break;
+ }
+ }
+ });
+
+ // Write some data
+
+ for (int i = 0; i < 10; i++) {
+ writeData(table, row++);
+ }
+
+ assertFalse("Should not have triggered log roll due to SLOW_SYNC",
+ slowSyncHookCalled.get());
+
+ // Set up for test
+ slowSyncHookCalled.set(false);
+
+ // Wrap the current writer with the anonymous class below that adds 200 ms of
+ // latency to any sync on the hlog. This should be more than sufficient to trigger
+ // slow sync warnings.
+ final Writer oldWriter1 = log.getWriter();
+ final Writer newWriter1 = new Writer() {
+ @Override
+ public void close() throws IOException {
+ oldWriter1.close();
+ }
+ @Override
+ public void sync(boolean forceSync) throws IOException {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ InterruptedIOException ex = new InterruptedIOException();
+ ex.initCause(e);
+ throw ex;
+ }
+ oldWriter1.sync(forceSync);
+ }
+ @Override
+ public void append(Entry entry) throws IOException {
+ oldWriter1.append(entry);
+ }
+ @Override
+ public long getLength() {
+ return oldWriter1.getLength();
+ }
+ };
+ log.setWriter(newWriter1);
+
+ // Write some data.
+ // We need to write at least 5 times, but double it. We should only request
+ // a SLOW_SYNC roll once in the current interval.
+ for (int i = 0; i < 10; i++) {
+ writeData(table, row++);
+ }
+
+ // Wait for our wait injecting writer to get rolled out, as needed.
+
+ TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return log.getWriter() != newWriter1;
+ }
+ @Override
+ public String explainFailure() throws Exception {
+ return "Waited too long for our test writer to get rolled out";
+ }
+ });
+
+ assertTrue("Should have triggered log roll due to SLOW_SYNC",
+ slowSyncHookCalled.get());
+
+ // Set up for test
+ slowSyncHookCalled.set(false);
+
+ // Wrap the current writer with the anonymous class below that adds 5000 ms of
+ // latency to any sync on the hlog.
+ // This will trip the other threshold.
+ final Writer oldWriter2 = (Writer)log.getWriter();
+ final Writer newWriter2 = new Writer() {
+ @Override
+ public void close() throws IOException {
+ oldWriter2.close();
+ }
+ @Override
+ public void sync(boolean forceSync) throws IOException {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ InterruptedIOException ex = new InterruptedIOException();
+ ex.initCause(e);
+ throw ex;
+ }
+ oldWriter2.sync(forceSync);
+ }
+ @Override
+ public void append(Entry entry) throws IOException {
+ oldWriter2.append(entry);
+ }
+ @Override
+ public long getLength() {
+ return oldWriter2.getLength();
+ }
+ };
+ log.setWriter(newWriter2);
+
+ // Write some data. Should only take one sync.
+
+ writeData(table, row++);
+
+ // Wait for our wait injecting writer to get rolled out, as needed.
+
+ TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return log.getWriter() != newWriter2;
+ }
+ @Override
+ public String explainFailure() throws Exception {
+ return "Waited too long for our test writer to get rolled out";
+ }
+ });
+
+ assertTrue("Should have triggered log roll due to SLOW_SYNC",
+ slowSyncHookCalled.get());
+
+ // Set up for test
+ slowSyncHookCalled.set(false);
+
+ // Write some data
+ for (int i = 0; i < 10; i++) {
+ writeData(table, row++);
+ }
+
+ assertFalse("Should not have triggered log roll due to SLOW_SYNC",
+ slowSyncHookCalled.get());
+
+ } finally {
+ table.close();
+ }
}
void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
@@ -147,9 +323,13 @@ public class TestLogRolling extends AbstractTestLogRolling {
log.registerWALActionsListener(new WALActionsListener() {
@Override
- public void logRollRequested(boolean lowReplication) {
- if (lowReplication) {
- lowReplicationHookCalled.lazySet(true);
+ public void logRollRequested(WALActionsListener.RollRequestReason reason) {
+ switch (reason) {
+ case LOW_REPLICATION:
+ lowReplicationHookCalled.lazySet(true);
+ break;
+ default:
+ break;
}
}
});
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
index c0d3416..1c324dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java
@@ -41,13 +41,21 @@ public class TestMetricsWAL {
public void testLogRollRequested() throws Exception {
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source);
- metricsWAL.logRollRequested(false);
- metricsWAL.logRollRequested(true);
+ metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.ERROR);
+ metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.LOW_REPLICATION);
+ metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SLOW_SYNC);
+ metricsWAL.logRollRequested(WALActionsListener.RollRequestReason.SIZE);
- // Log roll was requested twice
- verify(source, times(2)).incrementLogRollRequested();
+ // Log roll was requested four times
+ verify(source, times(4)).incrementLogRollRequested();
+ // One was because of an IO error.
+ verify(source, times(1)).incrementErrorLogRoll();
// One was because of low replication on the hlog.
verify(source, times(1)).incrementLowReplicationLogRoll();
+ // One was because of slow sync on the hlog.
+ verify(source, times(1)).incrementSlowSyncLogRoll();
+ // One was because of hlog file length limit.
+ verify(source, times(1)).incrementSizeLogRoll();
}
@Test