You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2020/10/11 05:18:20 UTC

[hbase] branch master updated: HBASE-25065 WAL archival to be done by a separate thread (#2501)

This is an automated email from the ASF dual-hosted git repository.

ramkrishna pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new accd975  HBASE-25065 WAL archival to be done by a separate thread (#2501)
accd975 is described below

commit accd9750aa217c40e9db641c53905b8f4bb7e66d
Author: ramkrish86 <ra...@hotmail.com>
AuthorDate: Sun Oct 11 10:46:06 2020 +0530

    HBASE-25065 WAL archival to be done by a separate thread (#2501)
    
    * HBASE-25065 WAL archival can be batched/throttled and also done by a separate thread
    
    * Fix checkstyle issues
    
    * Address review comments
    
    * checkstyle comments
    
    * Addressing final review comments
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../hadoop/hbase/master/region/MasterRegion.java   |  2 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  2 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      | 69 ++++++++++++++++++++--
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  | 15 ++++-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      | 30 ++++++++--
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |  8 ++-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java       |  8 +--
 .../hadoop/hbase/wal/DisabledWALProvider.java      |  5 +-
 .../apache/hadoop/hbase/wal/FSHLogProvider.java    |  6 +-
 .../hadoop/hbase/wal/RegionGroupingProvider.java   |  9 ++-
 .../hbase/wal/SyncReplicationWALProvider.java      |  7 ++-
 .../org/apache/hadoop/hbase/wal/WALFactory.java    | 19 +++---
 .../org/apache/hadoop/hbase/wal/WALProvider.java   |  5 +-
 .../regionserver/TestFailedAppendAndSync.java      | 44 +++++++++++++-
 .../regionserver/wal/AbstractTestLogRolling.java   |  7 ++-
 .../apache/hadoop/hbase/wal/IOTestProvider.java    |  4 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java    |  2 +-
 17 files changed, 199 insertions(+), 43 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
index 81da59d..688a549 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
@@ -301,7 +301,7 @@ public final class MasterRegion {
       params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
     walRoller.start();
 
-    WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), false);
+    WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
     Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
     HRegion region;
     if (fs.exists(tableDir)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d6eb45f..d51eab4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1906,7 +1906,7 @@ public class HRegionServer extends Thread implements
     boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
         !LoadBalancer.isMasterCanHostUserRegions(conf);
     WALFactory factory =
-        new WALFactory(conf, serverName.toString(), !isMasterNoTableOrSystemTableOnly);
+        new WALFactory(conf, serverName.toString(), this, !isMasterNoTableOrSystemTableOnly);
     if (!isMasterNoTableOrSystemTableOnly) {
       // TODO Replication make assumptions here based on the default filesystem impl
       Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index d2c624a..ac99ea6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -84,8 +87,12 @@ import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+
+
 
 /**
  * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
@@ -185,6 +192,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    */
   protected final Configuration conf;
 
+  protected final Abortable abortable;
+
   /** Listeners that are called on WAL events. */
   protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
 
@@ -329,6 +338,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
 
   protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
 
+  private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor(
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archiver-%d").build());
+
+  private final int archiveRetries;
+
   public long getFilenum() {
     return this.filenum.get();
   }
@@ -380,10 +394,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
       final boolean failIfWALExists, final String prefix, final String suffix)
       throws FailedLogCloseException, IOException {
+    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+  }
+
+  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
+      final String logDir, final String archiveDir, final Configuration conf,
+      final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
+      final String suffix)
+      throws FailedLogCloseException, IOException {
     this.fs = fs;
     this.walDir = new Path(rootDir, logDir);
     this.walArchiveDir = new Path(rootDir, archiveDir);
     this.conf = conf;
+    this.abortable = abortable;
 
     if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
       throw new IOException("Unable to mkdir " + walDir);
@@ -482,6 +505,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
             SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
     this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
+    archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
+
   }
 
   /**
@@ -715,11 +740,39 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
         regionsBlockingThisWal.clear();
       }
     }
+
     if (logsToArchive != null) {
-      for (Pair<Path, Long> logAndSize : logsToArchive) {
-        this.totalLogSize.addAndGet(-logAndSize.getSecond());
-        archiveLogFile(logAndSize.getFirst());
-        this.walFile2Props.remove(logAndSize.getFirst());
+      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
+      // make it async
+      for (Pair<Path, Long> log : localLogsToArchive) {
+        logArchiveExecutor.execute(() -> {
+          archive(log);
+        });
+        this.walFile2Props.remove(log.getFirst());
+      }
+    }
+  }
+
+  protected void archive(final Pair<Path, Long> log) {
+    int retry = 1;
+    while (true) {
+      try {
+        archiveLogFile(log.getFirst());
+        totalLogSize.addAndGet(-log.getSecond());
+        // successful
+        break;
+      } catch (Throwable e) {
+        if (retry > archiveRetries) {
+          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
+          if (this.abortable != null) {
+            this.abortable.abort("Failed log archiving", e);
+            break;
+          }
+        } else {
+          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry,
+            e);
+        }
+        retry++;
       }
     }
   }
@@ -732,7 +785,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     return new Path(archiveDir, p.getName());
   }
 
-  private void archiveLogFile(final Path p) throws IOException {
+  @VisibleForTesting
+  protected void archiveLogFile(final Path p) throws IOException {
     Path newPath = getWALArchivePath(this.walArchiveDir, p);
     // Tell our listeners that a log is going to be archived.
     if (!this.listeners.isEmpty()) {
@@ -907,6 +961,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     rollWriterLock.lock();
     try {
       doShutdown();
+      if (logArchiveExecutor != null) {
+        logArchiveExecutor.shutdownNow();
+      }
     } finally {
       rollWriterLock.unlock();
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index a40e503..3424460 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -44,9 +44,11 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
@@ -60,7 +62,6 @@ import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
@@ -68,6 +69,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
 
+
 /**
  * An asynchronous implementation of FSWAL.
  * <p>
@@ -206,7 +208,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
       String prefix, String suffix, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
-    super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
+        eventLoopGroup, channelClass);
+  }
+
+  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
+      String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+      boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+        suffix);
     this.eventLoopGroup = eventLoopGroup;
     this.channelClass = channelClass;
     Supplier<Boolean> hasConsumerTask;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 001be00..fe910aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -40,10 +40,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -62,10 +64,10 @@ import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+
 /**
  * The default implementation of FSWAL.
  */
@@ -168,7 +170,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
 
   private final int waitOnShutdownInSeconds;
   private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
 
   /**
    * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
@@ -208,11 +210,25 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
   }
 
+  @VisibleForTesting
+  public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir,
+      final Configuration conf) throws IOException {
+    this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
+        null);
+  }
+
+  public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
+      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
+      final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
+    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+  }
+
   /**
    * Create an edit log at the given <code>dir</code> location. You should never have to load an
    * existing log. If there is a log at startup, it should have already been processed and deleted
    * by the time the WAL object is started up.
    * @param fs filesystem handle
+   * @param abortable Abortable - the server here
    * @param rootDir path to where logs and oldlogs
    * @param logDir dir where wals are stored
    * @param archiveDir dir where wals are archived
@@ -226,10 +242,12 @@ public class FSHLog extends AbstractFSWAL<Writer> {
    * @param suffix will be url encoded. null is treated as empty. non-empty must start with
    *          {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER}
    */
-  public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
-      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
-      final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
-    super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
+  public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir,
+      final String logDir, final String archiveDir, final Configuration conf,
+      final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
+      final String suffix) throws IOException {
+    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
+        suffix);
     this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
       CommonFSUtils.getDefaultReplication(fs, this.walDir));
     this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index e7bdb0b..84c94e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,10 +30,12 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -88,6 +91,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
   protected AtomicBoolean initialized = new AtomicBoolean(false);
   // for default wal provider, logPrefix won't change
   protected String logPrefix;
+  protected Abortable abortable;
 
   /**
    * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs
@@ -102,7 +106,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
    *          null
    */
   @Override
-  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+      throws IOException {
     if (!initialized.compareAndSet(false, true)) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
@@ -119,6 +124,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
       }
     }
     logPrefix = sb.toString();
+    this.abortable = abortable;
     doInit(conf);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 062b368..3a2ffa7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -65,11 +65,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
 
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
-    return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
-        getWALDirectoryName(factory.factoryId),
+    return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
+        CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
         getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
-        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
-        eventLoopGroup, channelClass);
+        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
+        channelClass);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0ff2195..6e5a053 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -25,8 +25,10 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -55,7 +57,8 @@ class DisabledWALProvider implements WALProvider {
   WAL disabled;
 
   @Override
-  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+      throws IOException {
     if (null != disabled) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index 3b91c24..e64d70f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -67,7 +67,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
    * Public because of FSHLog. Should be package-private
    */
   public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path,
-    final boolean overwritable, long blocksize) throws IOException {
+      final boolean overwritable, long blocksize) throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends Writer> logWriterClass =
         conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
@@ -101,8 +101,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
 
   @Override
   protected FSHLog createWAL() throws IOException {
-    return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
-        getWALDirectoryName(factory.factoryId),
+    return new FSHLog(CommonFSUtils.getWALFileSystem(conf), abortable,
+        CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
         getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
         META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 764d3d5..20d043b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@@ -137,14 +139,17 @@ public class RegionGroupingProvider implements WALProvider {
   private List<WALActionsListener> listeners = new ArrayList<>();
   private String providerId;
   private Class<? extends WALProvider> providerClass;
+  private Abortable abortable;
 
   @Override
-  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+      throws IOException {
     if (null != strategy) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
     this.conf = conf;
     this.factory = factory;
+    this.abortable = abortable;
 
     if (META_WAL_PROVIDER_ID.equals(providerId)) {
       // do not change the provider id if it is for meta
@@ -171,7 +176,7 @@ public class RegionGroupingProvider implements WALProvider {
   private WALProvider createProvider(String group) throws IOException {
     WALProvider provider = WALFactory.createProvider(providerClass);
     provider.init(factory, conf,
-      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group);
+      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group, this.abortable);
     provider.addWALActionsListener(new MetricsWAL());
     return provider;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 9859c20..001e1a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -35,7 +35,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
@@ -108,11 +110,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
   }
 
   @Override
-  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+      throws IOException {
     if (!initialized.compareAndSet(false, true)) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
-    provider.init(factory, conf, providerId);
+    provider.init(factory, conf, providerId, abortable);
     this.conf = conf;
     this.factory = factory;
     Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 26b8727..6a5feb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -21,9 +21,11 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
@@ -35,7 +37,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -86,6 +87,7 @@ public class WALFactory {
   public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
 
   final String factoryId;
+  final Abortable abortable;
   private final WALProvider provider;
   // The meta updates are written to a different wal. If this
   // regionserver holds meta regions, then this ref will be non-null.
@@ -119,6 +121,7 @@ public class WALFactory {
     // this instance can't create wals, just reader/writers.
     provider = null;
     factoryId = SINGLETON_ID;
+    this.abortable = null;
   }
 
   @VisibleForTesting
@@ -175,7 +178,7 @@ public class WALFactory {
   public WALFactory(Configuration conf, String factoryId) throws IOException {
     // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
     // for HMaster or HRegionServer which take system table only. See HBASE-19999
-    this(conf, factoryId, true);
+    this(conf, factoryId, null, true);
   }
 
   /**
@@ -183,11 +186,12 @@ public class WALFactory {
    *          instances.
    * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
    *          to make a directory
+   * @param abortable the server associated with this WAL file
    * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
    *          {@link SyncReplicationWALProvider}
    */
-  public WALFactory(Configuration conf, String factoryId, boolean enableSyncReplicationWALProvider)
-      throws IOException {
+  public WALFactory(Configuration conf, String factoryId, Abortable abortable,
+      boolean enableSyncReplicationWALProvider) throws IOException {
     // until we've moved reader/writer construction down into providers, this initialization must
     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
@@ -196,20 +200,21 @@ public class WALFactory {
       AbstractFSWALProvider.Reader.class);
     this.conf = conf;
     this.factoryId = factoryId;
+    this.abortable = abortable;
     // end required early initialization
     if (conf.getBoolean(WAL_ENABLED, true)) {
       WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
       if (enableSyncReplicationWALProvider) {
         provider = new SyncReplicationWALProvider(provider);
       }
-      provider.init(this, conf, null);
+      provider.init(this, conf, null, this.abortable);
       provider.addWALActionsListener(new MetricsWAL());
       this.provider = provider;
     } else {
       // special handling of existing configuration behavior.
       LOG.warn("Running with WAL disabled.");
       provider = new DisabledWALProvider();
-      provider.init(this, conf, factoryId);
+      provider.init(this, conf, factoryId, null);
     }
   }
 
@@ -274,7 +279,7 @@ public class WALFactory {
         clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
       }
       provider = createProvider(clz);
-      provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
+      provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
       provider.addWALActionsListener(new MetricsWAL());
       if (metaProvider.compareAndSet(null, provider)) {
         return provider;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index c3bd149..01c1d11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -23,7 +23,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -46,7 +48,8 @@ public interface WALProvider {
    * @param conf may not be null
    * @param providerId differentiate between providers from one factory. may be null
    */
-  void init(WALFactory factory, Configuration conf, String providerId) throws IOException;
+  void init(WALFactory factory, Configuration conf, String providerId, Abortable server)
+      throws IOException;
 
   /**
    * @param region the region which we want to get a WAL for it. Could be null.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index a9ce548..fdf96da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
@@ -107,11 +110,13 @@ public class TestFailedAppendAndSync {
   class DodgyFSLog extends FSHLog {
     volatile boolean throwSyncException = false;
     volatile boolean throwAppendException = false;
+    volatile boolean throwArchiveException = false;
+
     final AtomicLong rolls = new AtomicLong(0);
 
-    public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
+    public DodgyFSLog(FileSystem fs, Server server, Path root, String logDir, Configuration conf)
         throws IOException {
-      super(fs, root, logDir, conf);
+      super(fs, server, root, logDir, conf);
     }
 
     @Override
@@ -123,6 +128,18 @@ public class TestFailedAppendAndSync {
     }
 
     @Override
+    protected void archiveLogFile(Path p) throws IOException {
+      if (throwArchiveException) {
+        throw new IOException("throw archival exception");
+      }
+    }
+
+    @Override
+    protected void archive(Pair<Path, Long> localLogsToArchive) {
+      super.archive(localLogsToArchive);
+    }
+
+    @Override
     protected Writer createWriterInstance(Path path) throws IOException {
       final Writer w = super.createWriterInstance(path);
       return new Writer() {
@@ -176,7 +193,7 @@ public class TestFailedAppendAndSync {
     // the test.
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + getName());
-    DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
+    DodgyFSLog dodgyWAL = new DodgyFSLog(fs, (Server)services, rootDir, getName(), CONF);
     dodgyWAL.init();
     LogRoller logRoller = new LogRoller(services);
     logRoller.addWAL(dodgyWAL);
@@ -256,6 +273,27 @@ public class TestFailedAppendAndSync {
           Threads.sleep(1);
         }
       }
+
+      try {
+        dodgyWAL.throwAppendException = false;
+        dodgyWAL.throwSyncException = false;
+        dodgyWAL.throwArchiveException = true;
+        Pair<Path, Long> pair = new Pair<Path, Long>();
+        pair.setFirst(new Path("/a/b/"));
+        pair.setSecond(100L);
+        dodgyWAL.archive(pair);
+      } catch (Throwable ioe) {
+      }
+      while (true) {
+        try {
+          // one more abort needs to be called
+          Mockito.verify(services, Mockito.atLeast(2)).abort(Mockito.anyString(),
+            (Throwable) Mockito.anyObject());
+          break;
+        } catch (WantedButNotInvoked t) {
+          Threads.sleep(1);
+        }
+      }
     } finally {
       // To stop logRoller, its server has to say it is stopped.
       Mockito.when(services.isStopped()).thenReturn(true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
index 4c19aa0..6e2059d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
@@ -175,10 +175,15 @@ public abstract class AbstractTestLogRolling  {
     }
   }
 
-  private void assertLogFileSize(WAL log) {
+  private void assertLogFileSize(WAL log) throws InterruptedException {
     if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
       assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
     } else {
+      for (int i = 0; i < 10; i++) {
+        if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
+          Thread.sleep(10);
+        }
+      }
       assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d062c77..ecbd043 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
 // imports for things that haven't moved from regionserver.wal yet.
@@ -99,7 +100,8 @@ public class IOTestProvider implements WALProvider {
    *                   null
    */
   @Override
-  public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
+  public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable)
+      throws IOException {
     if (!initialized.compareAndSet(false, true)) {
       throw new IllegalStateException("WALProvider.init should only be called once.");
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index a899bdc..656932b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -687,7 +687,7 @@ public class TestWALFactory {
     assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass());
 
     // if providers are not set and do not enable SyncReplicationWALProvider
-    walFactory = new WALFactory(conf, this.currentServername.toString(), false);
+    walFactory = new WALFactory(conf, this.currentServername.toString(), null, false);
     assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
   }