You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2020/10/12 16:27:56 UTC
[hbase] branch branch-2 updated: HBASE-25065 - WAL archival to be
done by a separate thread (#2531)
This is an automated email from the ASF dual-hosted git repository.
ramkrishna pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new ed7aa8e HBASE-25065 - WAL archival to be done by a separate thread (#2531)
ed7aa8e is described below
commit ed7aa8e3695e7564ade68adb77c1555bfda0b50a
Author: ramkrish86 <ra...@hotmail.com>
AuthorDate: Mon Oct 12 21:57:25 2020 +0530
HBASE-25065 - WAL archival to be done by a separate thread (#2531)
* HBASE-25065 - WAL archival to be done by a separate thread
* Fix checkstyle comments
* Fix compile issue
* Fix checkstyle and make the failing test more reliable
* Remove unused import
---
.../hadoop/hbase/regionserver/HRegionServer.java | 3 +-
.../hbase/regionserver/wal/AbstractFSWAL.java | 63 ++++++++++++++++++++--
.../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 17 ++++--
.../hadoop/hbase/regionserver/wal/FSHLog.java | 27 ++++++++--
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 7 ++-
.../hadoop/hbase/wal/AsyncFSWALProvider.java | 8 +--
.../hadoop/hbase/wal/DisabledWALProvider.java | 4 +-
.../apache/hadoop/hbase/wal/FSHLogProvider.java | 4 +-
.../hadoop/hbase/wal/RegionGroupingProvider.java | 5 +-
.../org/apache/hadoop/hbase/wal/WALFactory.java | 20 ++++---
.../org/apache/hadoop/hbase/wal/WALProvider.java | 5 +-
.../master/region/TestMasterRegionWALCleaner.java | 10 +---
.../regionserver/TestFailedAppendAndSync.java | 44 +++++++++++++--
.../regionserver/wal/AbstractTestLogRolling.java | 7 ++-
.../apache/hadoop/hbase/wal/IOTestProvider.java | 4 +-
15 files changed, 184 insertions(+), 44 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a80881e..d2b40f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -1897,7 +1898,7 @@ public class HRegionServer extends Thread implements
* be hooked up to WAL.
*/
private void setupWALAndReplication() throws IOException {
- WALFactory factory = new WALFactory(conf, serverName.toString());
+ WALFactory factory = new WALFactory(conf, serverName.toString(), (Server)this);
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 3b688d5..ee8790f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -86,6 +89,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
@@ -181,6 +185,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
protected final Configuration conf;
+ protected final Abortable abortable;
+
/** Listeners that are called on WAL events. */
protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
@@ -313,6 +319,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
+ private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build());
+
+ private final int archiveRetries;
+
public long getFilenum() {
return this.filenum.get();
}
@@ -364,10 +375,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
final boolean failIfWALExists, final String prefix, final String suffix)
throws FailedLogCloseException, IOException {
+ this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+ }
+
+ protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
+ final String logDir, final String archiveDir, final Configuration conf,
+ final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
+ final String suffix)
+ throws FailedLogCloseException, IOException {
this.fs = fs;
this.walDir = new Path(rootDir, logDir);
this.walArchiveDir = new Path(rootDir, archiveDir);
this.conf = conf;
+ this.abortable = abortable;
if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
throw new IOException("Unable to mkdir " + walDir);
@@ -464,6 +484,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
};
this.implClassName = getClass().getSimpleName();
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
+ archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0);
}
/**
@@ -672,11 +693,39 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
}
+
if (logsToArchive != null) {
- for (Pair<Path, Long> logAndSize : logsToArchive) {
- this.totalLogSize.addAndGet(-logAndSize.getSecond());
- archiveLogFile(logAndSize.getFirst());
- this.walFile2Props.remove(logAndSize.getFirst());
+ final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
+ // make it async
+ for (Pair<Path, Long> log : localLogsToArchive) {
+ logArchiveExecutor.execute(() -> {
+ archive(log);
+ });
+ this.walFile2Props.remove(log.getFirst());
+ }
+ }
+ }
+
+ protected void archive(final Pair<Path, Long> log) {
+ int retry = 1;
+ while (true) {
+ try {
+ archiveLogFile(log.getFirst());
+ totalLogSize.addAndGet(-log.getSecond());
+ // successful
+ break;
+ } catch (Throwable e) {
+ if (retry > archiveRetries) {
+ LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
+ if (this.abortable != null) {
+ this.abortable.abort("Failed log archiving", e);
+ break;
+ }
+ } else {
+ LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry,
+ e);
+ }
+ retry++;
}
}
}
@@ -689,7 +738,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return new Path(archiveDir, p.getName());
}
- private void archiveLogFile(final Path p) throws IOException {
+ @VisibleForTesting
+ protected void archiveLogFile(final Path p) throws IOException {
Path newPath = getWALArchivePath(this.walArchiveDir, p);
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
@@ -865,6 +915,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
rollWriterLock.lock();
try {
doShutdown();
+ if (logArchiveExecutor != null) {
+ logArchiveExecutor.shutdownNow();
+ }
} finally {
rollWriterLock.unlock();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 3c799bf..66149a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -44,9 +44,11 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
@@ -57,17 +59,17 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
+
/**
* An asynchronous implementation of FSWAL.
* <p>
@@ -206,7 +208,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
- super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+ this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
+ eventLoopGroup, channelClass);
+ }
+
+ public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
+ String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+ boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+ super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+ suffix);
this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 2227da7..f2ee0ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -40,10 +40,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -62,10 +64,10 @@ import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* The default implementation of FSWAL.
*/
@@ -208,6 +210,19 @@ public class FSHLog extends AbstractFSWAL<Writer> {
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
}
+ @VisibleForTesting
+ public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir,
+ final Configuration conf) throws IOException {
+ this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
+ null);
+ }
+
+ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
+ final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
+ final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
+ this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+ }
+
/**
* Create an edit log at the given <code>dir</code> location. You should never have to load an
* existing log. If there is a log at startup, it should have already been processed and deleted
@@ -226,10 +241,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
* @param suffix will be url encoded. null is treated as empty. non-empty must start with
* {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
*/
- public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
- final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
- final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
- super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+ public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir,
+ final String logDir, final String archiveDir, final Configuration conf,
+ final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
+ final String suffix) throws IOException {
+ super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+ suffix);
this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
CommonFSUtils.getDefaultReplication(fs, this.walDir));
this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index d3bb0d9..6fcebb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -28,10 +28,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
@@ -88,6 +90,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
protected AtomicBoolean initialized = new AtomicBoolean(false);
// for default wal provider, logPrefix won't change
protected String logPrefix;
+ protected Abortable abortable;
/**
* We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
@@ -102,7 +105,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* null
*/
@Override
- public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+ public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+ throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
@@ -119,6 +123,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
}
}
logPrefix = sb.toString();
+ this.abortable = abortable;
doInit(conf);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 88b0140..377f6a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -67,11 +67,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
private Class<? extends Channel> channelClass;
@Override
protected AsyncFSWAL createWAL() throws IOException {
- return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
- getWALDirectoryName(factory.factoryId),
+ return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
+ CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
- META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
- eventLoopGroup, channelClass);
+ META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
+ channelClass);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0ff2195..6c215f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -55,7 +56,8 @@ class DisabledWALProvider implements WALProvider {
WAL disabled;
@Override
- public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+ public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+ throws IOException {
if (null != disabled) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index 3b91c24..8f2ca07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -101,8 +101,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
@Override
protected FSHLog createWAL() throws IOException {
- return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
- getWALDirectoryName(factory.factoryId),
+ return new FSHLog(CommonFSUtils.getWALFileSystem(conf), abortable,
+ CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 2fd8288..4a2c220 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
// imports for classes still in regionserver.wal
@@ -137,7 +139,8 @@ public class RegionGroupingProvider implements WALProvider {
private Class<? extends WALProvider> providerClass;
@Override
- public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+ public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+ throws IOException {
if (null != strategy) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 30bb77e..3b7f311 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -21,9 +21,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
@@ -37,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
/**
* Entry point for users of the Write Ahead Log.
* Acts as the shim between internal use and the particular WALProvider we use to handle wal
@@ -86,6 +87,7 @@ public class WALFactory {
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
final String factoryId;
+ final Abortable abortable;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
// regionserver holds meta regions, then this ref will be non-null.
@@ -119,6 +121,7 @@ public class WALFactory {
// this instance can't create wals, just reader/writers.
provider = null;
factoryId = SINGLETON_ID;
+ this.abortable = null;
}
@VisibleForTesting
@@ -160,7 +163,7 @@ public class WALFactory {
LOG.info("Instantiating WALProvider of type " + clazz);
try {
final WALProvider result = clazz.getDeclaredConstructor().newInstance();
- result.init(this, conf, providerId);
+ result.init(this, conf, providerId, this.abortable);
return result;
} catch (Exception e) {
LOG.error("couldn't set up WALProvider, the configured class is " + clazz);
@@ -180,13 +183,17 @@ public class WALFactory {
return provider;
}
+ @VisibleForTesting
+ public WALFactory(Configuration conf, String factoryId) throws IOException {
+ this(conf, factoryId, null);
+ }
+
/**
* @param conf must not be null, will keep a reference to read params in later reader/writer
* instances.
- * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
- * to make a directory
+ * @param abortable the server to abort
*/
- public WALFactory(Configuration conf, String factoryId) throws IOException {
+ public WALFactory(Configuration conf, String factoryId, Abortable abortable) throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
@@ -195,6 +202,7 @@ public class WALFactory {
AbstractFSWALProvider.Reader.class);
this.conf = conf;
this.factoryId = factoryId;
+ this.abortable = abortable;
// end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) {
provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
@@ -202,7 +210,7 @@ public class WALFactory {
// special handling of existing configuration behavior.
LOG.warn("Running with WAL disabled.");
provider = new DisabledWALProvider();
- provider.init(this, conf, factoryId);
+ provider.init(this, conf, factoryId, null);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index c3bd149..0a3123a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -23,7 +23,9 @@ import java.io.IOException;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -46,7 +48,8 @@ public interface WALProvider {
* @param conf may not be null
* @param providerId differentiate between providers from one factory. may be null
*/
- void init(WALFactory factory, Configuration conf, String providerId) throws IOException;
+ void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+ throws IOException;
/**
* @param region the region which we want to get a WAL for it. Could be null.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
index 08b5f99..fb1d0ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
@@ -19,11 +19,9 @@ package org.apache.hadoop.hbase.master.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -88,12 +86,8 @@ public class TestMasterRegionWALCleaner extends MasterRegionTestBase {
region.requestRollAll();
region.waitUntilWalRollFinished();
// should have one
- FileStatus[] files = fs.listStatus(globalWALArchiveDir);
- assertEquals(1, files.length);
- Thread.sleep(2000);
- // should still be there
- assertTrue(fs.exists(files[0].getPath()));
- Thread.sleep(6000);
+
+ Thread.sleep(9000);
// should have been cleaned
assertEquals(0, fs.listStatus(globalWALArchiveDir).length);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 67e3134..a37f1f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
@@ -116,10 +118,11 @@ public class TestFailedAppendAndSync {
class DodgyFSLog extends FSHLog {
volatile boolean throwSyncException = false;
volatile boolean throwAppendException = false;
+ volatile boolean throwArchiveException = false;
- public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
- throws IOException {
- super(fs, root, logDir, conf);
+ public DodgyFSLog(FileSystem fs, Abortable abortable, Path root, String logDir,
+ Configuration conf) throws IOException {
+ super(fs, abortable, root, logDir, conf);
}
@Override
@@ -131,6 +134,18 @@ public class TestFailedAppendAndSync {
}
@Override
+ protected void archiveLogFile(Path p) throws IOException {
+ if (throwArchiveException) {
+ throw new IOException("throw archival exception");
+ }
+ }
+
+ @Override
+ protected void archive(Pair<Path, Long> localLogsToArchive) {
+ super.archive(localLogsToArchive);
+ }
+
+ @Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@@ -177,7 +192,7 @@ public class TestFailedAppendAndSync {
// the test.
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + getName());
- DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
+ DodgyFSLog dodgyWAL = new DodgyFSLog(fs, services, rootDir, getName(), CONF);
dodgyWAL.init();
LogRoller logRoller = new LogRoller(services);
logRoller.addWAL(dodgyWAL);
@@ -253,6 +268,27 @@ public class TestFailedAppendAndSync {
Threads.sleep(1);
}
}
+
+ try {
+ dodgyWAL.throwAppendException = false;
+ dodgyWAL.throwSyncException = false;
+ dodgyWAL.throwArchiveException = true;
+ Pair<Path, Long> pair = new Pair<Path, Long>();
+ pair.setFirst(new Path("/a/b/"));
+ pair.setSecond(100L);
+ dodgyWAL.archive(pair);
+ } catch (Throwable ioe) {
+ }
+ while (true) {
+ try {
+ // one more abort needs to be called
+ Mockito.verify(services, Mockito.atLeast(2)).abort(Mockito.anyString(),
+ (Throwable) Mockito.anyObject());
+ break;
+ } catch (WantedButNotInvoked t) {
+ Threads.sleep(1);
+ }
+ }
} finally {
// To stop logRoller, its server has to say it is stopped.
Mockito.when(services.isStopped()).thenReturn(true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
index 83bd9ab..44ca988 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
@@ -175,10 +175,15 @@ public abstract class AbstractTestLogRolling {
}
}
- private void assertLogFileSize(WAL log) {
+ private void assertLogFileSize(WAL log) throws InterruptedException {
if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
} else {
+ for (int i = 0; i < 10; i++) {
+ if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
+ Thread.sleep(10);
+ }
+ }
assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index 384a293..e624e6f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
// imports for things that haven't moved from regionserver.wal yet.
@@ -99,7 +100,8 @@ public class IOTestProvider implements WALProvider {
* null
*/
@Override
- public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+ public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+ throws IOException {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("WALProvider.init should only be called once.");
}