You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/07/27 15:24:18 UTC

[hbase] branch branch-2 updated: HBASE-24632 Enable procedure-based log splitting as default in hbase3 Add deprecation of 'classic' zk-based WAL splitter.

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new edb4cd5  HBASE-24632 Enable procedure-based log splitting as default in hbase3 Add deprecation of 'classic' zk-based WAL splitter.
edb4cd5 is described below

commit edb4cd534c6ad96745009b2bcc8587d52d1657a2
Author: stack <st...@apache.org>
AuthorDate: Tue Jun 30 20:53:36 2020 -0700

    HBASE-24632 Enable procedure-based log splitting as default in hbase3 Add deprecation of 'classic' zk-based WAL splitter.
    
    Also fix three bugs:
    
     * We were trying to delete non-empty directory; weren't doing
     accounting for meta WALs where meta had moved off the server
     (successfully)
     * We were deleting split WALs rather than archiving them.
     * We were not handling corrupt files.
    
    Deprecations and removal of tests of old system.
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |  11 +-
 .../org/apache/hadoop/hbase/SplitLogCounters.java  |   5 +
 .../java/org/apache/hadoop/hbase/SplitLogTask.java |   3 +
 .../coordination/SplitLogManagerCoordination.java  |   3 +
 .../coordination/SplitLogWorkerCoordination.java   |  10 +-
 .../coordination/ZkCoordinatedStateManager.java    |   3 +
 .../hadoop/hbase/master/MasterRpcServices.java     |   7 +
 .../hadoop/hbase/master/MasterWalManager.java      |  34 ++-
 .../hbase/master/MetricsMasterWrapperImpl.java     |   3 +
 .../hadoop/hbase/master/SplitLogManager.java       |   3 +
 .../hadoop/hbase/master/SplitWALManager.java       |  41 ++-
 .../master/procedure/ServerCrashProcedure.java     |  55 ++--
 .../master/procedure/SplitWALRemoteProcedure.java  |   2 +-
 .../hadoop/hbase/regionserver/SplitLogWorker.java  |   8 +-
 .../regionserver/handler/WALSplitterHandler.java   |   5 +-
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  | 105 +++-----
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   | 290 ++++++++++++---------
 .../hadoop/hbase/master/AbstractTestDLS.java       | 239 +----------------
 .../master/assignment/MockMasterServices.java      |  22 +-
 .../hbase/regionserver/TestCleanupMetaWAL.java     |   2 +-
 .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java |  22 +-
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |  56 ++--
 .../hadoop/hbase/zookeeper/MetaTableLocator.java   |   4 +-
 .../apache/hadoop/hbase/zookeeper/ZKSplitLog.java  |   3 +
 24 files changed, 392 insertions(+), 544 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1eea08b..16bee93 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1480,9 +1480,18 @@ public final class HConstants {
   public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
     "hbase.client.fast.fail.interceptor.impl";
 
+  /**
+   * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+   *   distributed WAL splitter; see SplitWALManager.
+   */
+  @Deprecated
   public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated";
 
-  public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true;
+  /**
+   * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0.
+   */
+  @Deprecated
+  public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = false;
 
   public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters";
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
index 6be1131..443c8d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
@@ -25,9 +25,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 /**
  * Counters kept by the distributed WAL split log process.
  * Used by master and regionserver packages.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter, see SplitWALManager
  */
+@Deprecated
 @InterfaceAudience.Private
 public class SplitLogCounters {
+  private SplitLogCounters() {}
+
   //Spnager counters
   public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder();
   public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
index dd4eb93..ca07fcb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
@@ -31,7 +31,10 @@ import org.apache.hadoop.hbase.util.Bytes;
  * Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside of
  * this class.  Used by regionserver and master packages.
  * <p>Immutable
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter, see SplitWALManager
  */
+@Deprecated
 @InterfaceAudience.Private
 public class SplitLogTask {
   private final ServerName originServer;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
index 8682c91..33d8f2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
@@ -40,8 +40,11 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  * Methods required for task life circle: <BR>
  * {@link #checkTaskStillAvailable(String)} Check that task is still there <BR>
  * {@link #checkTasks()} check for unassigned tasks and resubmit them
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter, see SplitWALManager
  */
 @InterfaceAudience.Private
+@Deprecated
 public interface SplitLogManagerCoordination {
   /**
    * Detail class that shares data between coordination and split log manager
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
index ad74015..a9fae46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -1,5 +1,4 @@
- /**
-  *
+ /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@ -18,16 +17,14 @@
   */
 package org.apache.hadoop.hbase.coordination;
 import java.util.concurrent.atomic.LongAdder;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
-
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -44,7 +41,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  * <p>
  * Important methods for WALSplitterHandler: <BR>
  * splitting task has completed.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter, see SplitWALManager
  */
+@Deprecated
 @InterfaceAudience.Private
 public interface SplitLogWorkerCoordination {
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index ba73d53..323e575 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -25,7 +25,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter (see SplitWALManager) which doesn't use this zk-based coordinator.
  */
+@Deprecated
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class ZkCoordinatedStateManager implements CoordinatedStateManager {
   protected ZKWatcher watcher;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index adf8062..b46abc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.locking.LockProcedure;
@@ -2429,6 +2430,12 @@ public class MasterRpcServices extends RSRpcServices implements
   @Override
   public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
       ReportProcedureDoneRequest request) throws ServiceException {
+    // Check Masters is up and ready for duty before progressing. Remote side will keep trying.
+    try {
+      this.master.checkServiceStarted();
+    } catch (ServerNotRunningYetException snrye) {
+      throw new ServiceException(snrye);
+    }
     request.getResultList().forEach(result -> {
       if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
         master.remoteProcedureCompleted(result.getProcId());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 3f6bd8a..fe9107f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -55,6 +53,9 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 public class MasterWalManager {
   private static final Logger LOG = LoggerFactory.getLogger(MasterWalManager.class);
 
+  /**
+   * Filter *in* WAL files that are for the hbase:meta Region.
+   */
   final static PathFilter META_FILTER = new PathFilter() {
     @Override
     public boolean accept(Path p) {
@@ -62,6 +63,9 @@ public class MasterWalManager {
     }
   };
 
+  /**
+   * Filter *out* WAL files that are for the hbase:meta Region; i.e. return user-space WALs only.
+   */
   @VisibleForTesting
   public final static PathFilter NON_META_FILTER = new PathFilter() {
     @Override
@@ -81,10 +85,19 @@ public class MasterWalManager {
 
   // The Path to the old logs dir
   private final Path oldLogDir;
+
   private final Path rootDir;
 
   // create the split log lock
   private final Lock splitLogLock = new ReentrantLock();
+
+  /**
+   * Superceded by {@link SplitWALManager}; i.e. procedure-based WAL splitting rather than
+   *   'classic' zk-coordinated WAL splitting.
+   * @deprecated  since 2.3.0 and 3.0.0 to be removed in 4.0.0; replaced by {@link SplitWALManager}.
+   * @see SplitWALManager
+   */
+  @Deprecated
   private final SplitLogManager splitLogManager;
 
   // Is the fileystem ok?
@@ -101,7 +114,6 @@ public class MasterWalManager {
     this.rootDir = CommonFSUtils.getWALRootDir(conf);
     this.services = services;
     this.splitLogManager = new SplitLogManager(services, conf);
-
     this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
   }
 
@@ -203,7 +215,7 @@ public class MasterWalManager {
    */
   @Deprecated
   public Set<ServerName> getFailedServersFromLogFolders() throws IOException {
-    boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
+    boolean retrySplitting = !conf.getBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY,
         WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
 
     Set<ServerName> serverNames = new HashSet<>();
@@ -360,11 +372,13 @@ public class MasterWalManager {
   }
 
   /**
-   * For meta region open and closed normally on a server, it may leave some meta
-   * WAL in the server's wal dir. Since meta region is no long on this server,
-   * The SCP won't split those meta wals, just leaving them there. So deleting
-   * the wal dir will fail since the dir is not empty. Actually We can safely achive those
-   * meta log and Archiving the meta log and delete the dir.
+   * The hbase:meta region may OPEN and CLOSE without issue on a server and then move elsewhere.
+   * On CLOSE, the WAL for the hbase:meta table may not be archived yet (The WAL is only needed if
+   * hbase:meta did not close cleanaly). Since meta region is no long on this server,
+   * the ServerCrashProcedure won't split these leftover hbase:meta WALs, just leaving them in
+   * the WAL splitting dir. If we try to delete the WAL splitting for the server,  it fail since
+   * the dir is not totally empty. We can safely archive these hbase:meta log; then the
+   * WAL dir can be deleted.
    * @param serverName the server to archive meta log
    */
   public void archiveMetaLog(final ServerName serverName) {
@@ -395,6 +409,4 @@ public class MasterWalManager {
       LOG.warn("Failed archiving meta log for server " + serverName, ie);
     }
   }
-
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
index 4803a49..3c8b971 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
@@ -154,6 +154,9 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
 
   @Override
   public Map<String,Entry<Long,Long>> getTableSpaceUtilization() {
+    if (master == null) {
+      return Collections.emptyMap();
+    }
     QuotaObserverChore quotaChore = master.getQuotaObserverChore();
     if (quotaChore == null) {
       return Collections.emptyMap();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 7ed0d9a..3e0c746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -86,7 +86,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  * again. If a task is resubmitted then there is a risk that old "delete task"
  * can delete the re-submission.
  * @see SplitWALManager for an alternate implementation based on Procedures.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter, see SplitWALManager.
  */
+@Deprecated
 @InterfaceAudience.Private
 public class SplitLogManager {
   private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index 76407e0..9ff84dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,35 +16,36 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.master;
-
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
 import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
 import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
-
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
@@ -78,15 +79,17 @@ public class SplitWALManager {
   private final Path rootDir;
   private final FileSystem fs;
   private final Configuration conf;
+  private final Path walArchiveDir;
 
-  public SplitWALManager(MasterServices master) {
+  public SplitWALManager(MasterServices master) throws IOException {
     this.master = master;
     this.conf = master.getConfiguration();
     this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
         conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
     this.rootDir = master.getMasterFileSystem().getWALRootDir();
+    // TODO: This should be the WAL FS, not the Master FS?
     this.fs = master.getMasterFileSystem().getFileSystem();
-
+    this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
   }
 
   public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
@@ -117,14 +120,24 @@ public class SplitWALManager {
     return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
   }
 
-  public void deleteSplitWAL(String wal) throws IOException {
-    fs.delete(new Path(wal), false);
+  /**
+   * Archive processed WAL
+   */
+  public void archive(String wal) throws IOException {
+    WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
   }
 
   public void deleteWALDir(ServerName serverName) throws IOException {
     Path splitDir = getWALSplitDir(serverName);
-    if (!fs.delete(splitDir, false)) {
-      LOG.warn("Failed delete {}", splitDir);
+    try {
+      if (!fs.delete(splitDir, false)) {
+        LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
+      }
+    } catch (PathIsNotEmptyDirectoryException e) {
+      FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir);
+      LOG.warn("PathIsNotEmptyDirectoryException {}",
+        Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
+      throw e;
     }
   }
 
@@ -197,7 +210,11 @@ public class SplitWALManager {
       this.maxSplitTasks = maxSplitTasks;
       this.master = master;
       this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
-      this.master.getServerManager().registerListener(this);
+      // ServerManager might be null in a test context where we are mocking; allow for this
+      ServerManager sm = this.master.getServerManager();
+      if (sm != null) {
+        sm.registerListener(this);
+      }
     }
 
     public synchronized Optional<ServerName> acquire() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index a6ebbaa..17606c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.master.procedure;
-
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,7 +45,6 @@ import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
@@ -154,8 +151,8 @@ public class ServerCrashProcedure
           break;
         case SERVER_CRASH_SPLIT_META_LOGS:
           if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
-            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
-            splitMetaLogs(env);
+              DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+            zkCoordinatedSplitMetaLogs(env);
             setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
           } else {
             am.getRegionStates().metaLogSplitting(serverName);
@@ -164,8 +161,7 @@ public class ServerCrashProcedure
           }
           break;
         case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
-          if(isSplittingDone(env, true)){
-            cleanupSplitDir(env);
+          if (isSplittingDone(env, true)) {
             setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
             am.getRegionStates().metaLogSplit(serverName);
           } else {
@@ -195,7 +191,7 @@ public class ServerCrashProcedure
         case SERVER_CRASH_SPLIT_LOGS:
           if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
             DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
-            splitLogs(env);
+            zkCoordinatedSplitLogs(env);
             setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
           } else {
             am.getRegionStates().logSplitting(this.serverName);
@@ -256,19 +252,27 @@ public class ServerCrashProcedure
   private void cleanupSplitDir(MasterProcedureEnv env) {
     SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
     try {
+      if (!this.carryingMeta) {
+        // If we are NOT carrying hbase:meta, check if any left-over hbase:meta WAL files from an
+        // old hbase:meta tenancy on this server; clean these up if any before trying to remove the
+        // WAL directory of this server or we will fail. See archiveMetaLog comment for more details
+        // on this condition.
+        env.getMasterServices().getMasterWalManager().archiveMetaLog(this.serverName);
+      }
       splitWALManager.deleteWALDir(serverName);
     } catch (IOException e) {
-      LOG.warn("Remove WAL directory of server {} failed, ignore...", serverName, e);
+      LOG.warn("Remove WAL directory for {} failed, ignore...{}", serverName, e.getMessage());
     }
   }
 
   private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
-    LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta);
     SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
     try {
-      return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
+      int wals = splitWALManager.getWALsToSplit(serverName, splitMeta).size();
+      LOG.debug("Check if {} WAL splitting is done? wals={}, meta={}", serverName, wals, splitMeta);
+      return wals == 0;
     } catch (IOException e) {
-      LOG.warn("get filelist of serverName {} failed, retry...", serverName, e);
+      LOG.warn("Get WALs of {} failed, retry...", serverName, e);
       return false;
     }
   }
@@ -293,7 +297,12 @@ public class ServerCrashProcedure
     return hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri);
   }
 
-  private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
+  /**
+   * Split hbase:meta logs using 'classic' zk-based coordination.
+   * Superceded by procedure-based WAL splitting.
+   * @see #createSplittingWalProcedures(MasterProcedureEnv, boolean)
+   */
+  private void zkCoordinatedSplitMetaLogs(MasterProcedureEnv env) throws IOException {
     LOG.debug("Splitting meta WALs {}", this);
     MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
@@ -303,7 +312,12 @@ public class ServerCrashProcedure
     LOG.debug("Done splitting meta WALs {}", this);
   }
 
-  private void splitLogs(final MasterProcedureEnv env) throws IOException {
+  /**
+   * Split logs using 'classic' zk-based coordination.
+   * Superceded by procedure-based WAL splitting.
+   * @see #createSplittingWalProcedures(MasterProcedureEnv, boolean)
+   */
+  private void zkCoordinatedSplitLogs(final MasterProcedureEnv env) throws IOException {
     LOG.debug("Splitting WALs {}", this);
     MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
@@ -333,14 +347,12 @@ public class ServerCrashProcedure
       currentRunningState = getCurrentState();
     }
     int childrenLatch = getChildrenLatch();
-    status.setStatus(msg + " current State " + currentRunningState
-        + (childrenLatch > 0 ? "; remaining num of running child procedures = " + childrenLatch
-            : ""));
+    status.setStatus(msg + " current State " + currentRunningState + (childrenLatch > 0?
+      "; remaining num of running child procedures = " + childrenLatch: ""));
   }
 
   @Override
-  protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
-  throws IOException {
+  protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) throws IOException {
     // Can't rollback.
     throw new UnsupportedOperationException("unhandled state=" + state);
   }
@@ -424,7 +436,8 @@ public class ServerCrashProcedure
     int size = state.getRegionsOnCrashedServerCount();
     if (size > 0) {
       this.regionsOnCrashedServer = new ArrayList<>(size);
-      for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: state.getRegionsOnCrashedServerList()) {
+      for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri:
+          state.getRegionsOnCrashedServerList()) {
         this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
index c829e51..54607e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
@@ -96,7 +96,7 @@ public class SplitWALRemoteProcedure extends ServerRemoteProcedure
   protected void complete(MasterProcedureEnv env, Throwable error) {
     if (error == null) {
       try {
-        env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
+        env.getMasterServices().getSplitWALManager().archive(walPath);
       } catch (IOException e) {
         LOG.warn("Failed split of {}; ignore...", walPath, e);
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 44c8f5c..864b395 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -56,7 +56,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  * the absence of a global lock there is a unavoidable race here - a worker might have just finished
  * its task when it is stripped of its ownership. Here we rely on the idempotency of the log
  * splitting task for correctness
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ *   distributed WAL splitter, see SplitWALRemoteProcedure
  */
+@Deprecated
 @InterfaceAudience.Private
 public class SplitLogWorker implements Runnable {
 
@@ -101,10 +104,9 @@ public class SplitLogWorker implements Runnable {
     // encountered a bad non-retry-able persistent error.
     try {
       SplitLogWorkerCoordination splitLogWorkerCoordination =
-          server.getCoordinatedStateManager() == null ? null
-              : server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
+        server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
       if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf,
-        p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
+          p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
         return Status.PREEMPTED;
       }
     } catch (InterruptedIOException iioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
index d6009e3..ff39531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
 /**
  * Handles log splitting a wal
  * Used by the zk-based distributed log splitting. Created by ZKSplitLogWorkerCoordination.
- */
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+  *   distributed WAL splitter, see SplitWALManager
+  */
+@Deprecated
 @InterfaceAudience.Private
 public class WALSplitterHandler extends EventHandler {
   private static final Logger LOG = LoggerFactory.getLogger(WALSplitterHandler.class);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 2b38494..f35bae2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.wal;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -56,9 +55,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -86,9 +83,6 @@ public final class WALSplitUtil {
    * the splitLogFile() part. If the master crashes then this function might get called multiple
    * times.
    * <p>
-   * @param logfile
-   * @param conf
-   * @throws IOException
    */
   public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
     Path walDir = CommonFSUtils.getWALRootDir(conf);
@@ -99,20 +93,9 @@ public final class WALSplitUtil {
     } else {
       walPath = new Path(walDir, logfile);
     }
-    finishSplitLogFile(walDir, oldLogDir, walPath, conf);
-  }
-
-  static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
-      Configuration conf) throws IOException {
-    List<Path> processedLogs = new ArrayList<>();
-    List<Path> corruptedLogs = new ArrayList<>();
     FileSystem walFS = walDir.getFileSystem(conf);
-    if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
-      corruptedLogs.add(walPath);
-    } else {
-      processedLogs.add(walPath);
-    }
-    archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
+    boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS);
+    archive(walPath, corrupt, oldLogDir, walFS, conf);
     Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
     walFS.delete(stagingDir, true);
   }
@@ -121,40 +104,40 @@ public final class WALSplitUtil {
    * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
    * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
    */
-  private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
-      final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
-    final Path corruptDir =
-      new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
-    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
-      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
-        corruptDir);
-    }
-    if (!walFS.mkdirs(corruptDir)) {
-      LOG.info("Unable to mkdir {}", corruptDir);
-    }
-    walFS.mkdirs(oldWALDir);
-
-    // this method can get restarted or called multiple times for archiving
-    // the same log files.
-    for (Path corruptedWAL : corruptedWALs) {
-      Path p = new Path(corruptDir, corruptedWAL.getName());
-      if (walFS.exists(corruptedWAL)) {
-        if (!walFS.rename(corruptedWAL, p)) {
-          LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
-        } else {
-          LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
-        }
+  static void archive(final Path wal, final boolean corrupt, final Path oldWALDir,
+      final FileSystem walFS, final Configuration conf) throws IOException {
+    Path dir;
+    Path target;
+    if (corrupt) {
+      dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
+      if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
+        LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir);
       }
+      target = new Path(dir, wal.getName());
+    } else {
+      dir = oldWALDir;
+      target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal);
     }
+    mkdir(walFS, dir);
+    moveWAL(walFS, wal, target);
+  }
 
-    for (Path p : processedWALs) {
-      Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
-      if (walFS.exists(p)) {
-        if (!CommonFSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
-          LOG.warn("Unable to move {} to {}", p, newPath);
-        } else {
-          LOG.info("Archived processed log {} to {}", p, newPath);
-        }
+  private static void mkdir(FileSystem fs, Path dir) throws IOException {
+    if (!fs.mkdirs(dir)) {
+      LOG.warn("Failed mkdir {}", dir);
+    }
+  }
+
+  /**
+   * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir.
+   * WAL may have already been moved; makes allowance.
+   */
+  public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException {
+    if (fs.exists(p)) {
+      if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) {
+        LOG.warn("Failed move of {} to {}", p, targetDir);
+      } else {
+        LOG.info("Moved {} to {}", p, targetDir);
       }
     }
   }
@@ -171,7 +154,6 @@ public final class WALSplitUtil {
    * @param tmpDirName of the directory used to sideline old recovered edits file
    * @param conf configuration
    * @return Path to file into which to dump split log edits.
-   * @throws IOException
    */
   @SuppressWarnings("deprecation")
   @VisibleForTesting
@@ -216,7 +198,7 @@ public final class WALSplitUtil {
   /**
    * Get the completed recovered edits file path, renaming it to be by last edit in the file from
    * its first edit. Then we could use the name to skip recovered edits when doing
-   * {@link HRegion#replayRecoveredEditsIfAny}.
+   * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask).
    * @return dstPath take file's last edit log seq num as the name
    */
   static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
@@ -303,7 +285,6 @@ public final class WALSplitUtil {
    * @param walFS WAL FileSystem used to retrieving split edits files.
    * @param regionDir WAL region dir to look for recovered edits files under.
    * @return Files in passed <code>regionDir</code> as a sorted set.
-   * @throws IOException
    */
   public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
       final Path regionDir) throws IOException {
@@ -349,7 +330,6 @@ public final class WALSplitUtil {
    * @param fs the file system used to rename bad edits file.
    * @param edits Edits file to move aside.
    * @return The name of the moved aside file.
-   * @throws IOException
    */
   public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
       throws IOException {
@@ -452,9 +432,9 @@ public final class WALSplitUtil {
     }
 
     private final ClientProtos.MutationProto.MutationType type;
-    public final Mutation mutation;
-    public final long nonceGroup;
-    public final long nonce;
+    @SuppressWarnings("checkstyle:VisibilityModifier") public final Mutation mutation;
+    @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonceGroup;
+    @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonce;
 
     @Override
     public int compareTo(final MutationReplay d) {
@@ -483,12 +463,9 @@ public final class WALSplitUtil {
   /**
    * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
    * WALEdit from the passed in WALEntry
-   * @param entry
-   * @param cells
    * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
    *          extracted from the passed in WALEntry.
    * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
-   * @throws IOException
    */
   public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
       CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
@@ -516,7 +493,9 @@ public final class WALSplitUtil {
         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
       }
       Cell cell = cells.current();
-      if (val != null) val.add(cell);
+      if (val != null) {
+        val.add(cell);
+      }
 
       boolean isNewRowOrType =
           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
@@ -575,8 +554,6 @@ public final class WALSplitUtil {
    * @param tableName the table name
    * @param encodedRegionName the encoded region name
    * @param familyName the column family name
-   * @param seqId the sequence id which used to generate file name
-   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
    * @return Path to recovered.hfiles directory of the region's column family.
    */
   static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 8384ab5..657eb22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -30,6 +29,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
+import edu.umd.cs.findbugs.annotations.Nullable;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -63,24 +62,26 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import javax.validation.constraints.Null;
 
 /**
  * Split RegionServer WAL files. Splits the WAL into new files,
  * one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
- * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
- * {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
- *   LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
- *   entry-point.
+ * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or
+ * use static helper methods.
  */
 @InterfaceAudience.Private
 public class WALSplitter {
   private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
+  public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors";
 
-  /** By default we retry errors in splitting, rather than skipping. */
+  /**
+   * By default we retry errors in splitting, rather than skipping.
+   */
   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
 
   // Parameters for split process
-  protected final Path walDir;
+  protected final Path walRootDir;
   protected final FileSystem walFS;
   protected final Configuration conf;
   final Path rootDir;
@@ -100,8 +101,6 @@ public class WALSplitter {
 
   private final WALFactory walFactory;
 
-  private MonitoredTask status;
-
   // For checking the latest flushed sequence id
   protected final LastSequenceId sequenceIdChecker;
 
@@ -132,17 +131,28 @@ public class WALSplitter {
 
   public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
   public final static String SPLIT_WAL_WRITER_THREADS =
-      "hbase.regionserver.hlog.splitlog.writer.threads";
+    "hbase.regionserver.hlog.splitlog.writer.threads";
+
+  private final int numWriterThreads;
+  private final long bufferSize;
+  private final boolean splitWriterCreationBounded;
+  private final boolean hfile;
+  private final boolean skipErrors;
+
+  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
+      FileSystem walFS, Path rootDir, FileSystem rootFS) {
+    this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null);
+  }
 
   @VisibleForTesting
-  WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
-      Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
+  WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
+      FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
       SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName =
-        conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+      conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
-    this.walDir = walDir;
+    this.walRootDir = walRootDir;
     this.walFS = walFS;
     this.rootDir = rootDir;
     this.rootFS = rootFS;
@@ -150,32 +160,17 @@ public class WALSplitter {
     this.splitLogWorkerCoordination = splitLogWorkerCoordination;
     this.rsServices = rsServices;
     this.walFactory = factory;
-    PipelineController controller = new PipelineController();
     this.tmpDirName =
       conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-
-
     // if we limit the number of writers opened for sinking recovered edits
-    boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
-    boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
-    long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
-    int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
-
-    if (splitToHFile) {
-      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
-      outputSink =
-          new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
-    } else if (splitWriterCreationBounded) {
-      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
-      outputSink =
-          new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
-    } else {
-      entryBuffers = new EntryBuffers(controller, bufferSize);
-      outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
-    }
+    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+    this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
+    this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
+    this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
+    this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT);
   }
 
-  WALFactory getWalFactory(){
+  WALFactory getWalFactory() {
     return this.walFactory;
   }
 
@@ -193,17 +188,25 @@ public class WALSplitter {
 
   /**
    * Splits a WAL file.
+   * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests.
+   * Not used by new procedure-based WAL splitter.
+   *
    * @return false if it is interrupted by the progress-able.
    */
   public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
       SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
       RegionServerServices rsServices) throws IOException {
+    Preconditions.checkNotNull(splitLogWorkerCoordination,
+      "Can't be null; needed to propagate WAL corruption if any found");
     Path rootDir = CommonFSUtils.getRootDir(conf);
     FileSystem rootFS = rootDir.getFileSystem(conf);
-    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
-        splitLogWorkerCoordination, rsServices);
-    return s.splitLogFile(logfile, reporter);
+    WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
+      splitLogWorkerCoordination, rsServices);
+    // splitWAL returns a data structure with whether split is finished and if the file is corrupt.
+    // We don't need to propagate corruption flag here because it is propagated by the
+    // SplitLogWorkerCoordination.
+    return splitter.splitWAL(logfile, reporter).isFinished();
   }
 
   /**
@@ -214,85 +217,123 @@ public class WALSplitter {
    * @return List of output files created by the split.
    */
   @VisibleForTesting
-  public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
+  public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS,
       Configuration conf, final WALFactory factory) throws IOException {
     Path rootDir = CommonFSUtils.getRootDir(conf);
     FileSystem rootFS = rootDir.getFileSystem(conf);
-    final FileStatus[] logfiles =
-        SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
+    WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS);
+    final FileStatus[] wals =
+      SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null);
     List<Path> splits = new ArrayList<>();
-    if (ArrayUtils.isNotEmpty(logfiles)) {
-      for (FileStatus logfile : logfiles) {
-        WALSplitter s =
-            new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null);
-        if (s.splitLogFile(logfile, null)) {
-          finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf);
-          if (s.outputSink.splits != null) {
-            splits.addAll(s.outputSink.splits);
+    if (ArrayUtils.isNotEmpty(wals)) {
+      for (FileStatus wal: wals) {
+        SplitWALResult splitWALResult = splitter.splitWAL(wal, null);
+        if (splitWALResult.isFinished()) {
+          WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf);
+          if (splitter.outputSink.splits != null) {
+            splits.addAll(splitter.outputSink.splits);
           }
         }
       }
     }
-    if (!walFS.delete(logDir, true)) {
-      throw new IOException("Unable to delete src dir: " + logDir);
+    if (!walFS.delete(walsDir, true)) {
+      throw new IOException("Unable to delete src dir " + walsDir);
     }
     return splits;
   }
 
   /**
-   * WAL splitting implementation, splits one log file.
-   * @param logfile should be an actual log file.
+   * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable).
+   * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if
+   * the WAL is corrupt.
+   */
+  static final class SplitWALResult {
+    private final boolean finished;
+    private final boolean corrupt;
+
+    private SplitWALResult(boolean finished, boolean corrupt) {
+      this.finished = finished;
+      this.corrupt = corrupt;
+    }
+
+    public boolean isFinished() {
+      return finished;
+    }
+
+    public boolean isCorrupt() {
+      return corrupt;
+    }
+  }
+
+  /**
+   * Setup the output sinks and entry buffers ahead of splitting WAL.
+   */
+  private void createOutputSinkAndEntryBuffers() {
+    PipelineController controller = new PipelineController();
+    if (this.hfile) {
+      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
+      this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller,
+        this.entryBuffers, this.numWriterThreads);
+    } else if (this.splitWriterCreationBounded) {
+      this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
+      this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller,
+        this.entryBuffers, this.numWriterThreads);
+    } else {
+      this.entryBuffers = new EntryBuffers(controller, this.bufferSize);
+      this.outputSink = new RecoveredEditsOutputSink(this, controller,
+        this.entryBuffers, this.numWriterThreads);
+    }
+  }
+
+  /**
+   * WAL splitting implementation, splits one WAL file.
+   * @param walStatus should be for an actual WAL file.
    */
   @VisibleForTesting
-  boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
-    Preconditions.checkState(status == null);
-    Preconditions.checkArgument(logfile.isFile(),
-        "passed in file status is for something other than a regular file.");
-    boolean isCorrupted = false;
-    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
-      SPLIT_SKIP_ERRORS_DEFAULT);
+  SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
+    Path wal = walStatus.getPath();
+    Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
+    boolean corrupt = false;
     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
-    Path logPath = logfile.getPath();
     boolean outputSinkStarted = false;
-    boolean progressFailed = false;
+    boolean cancelled = false;
     int editsCount = 0;
     int editsSkipped = 0;
-
-    status = TaskMonitor.get().createStatus(
-          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+    MonitoredTask status =
+      TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
     status.enableStatusJournal(true);
-    Reader logFileReader = null;
-    this.fileBeingSplit = logfile;
+    Reader walReader = null;
+    this.fileBeingSplit = walStatus;
     long startTS = EnvironmentEdgeManager.currentTime();
+    long length = walStatus.getLen();
+    String lengthStr = StringUtils.humanSize(length);
+    createOutputSinkAndEntryBuffers();
     try {
-      long logLength = logfile.getLen();
-      LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
-          logLength);
-      status.setStatus("Opening log file " + logPath);
-      if (reporter != null && !reporter.progress()) {
-        progressFailed = true;
-        return false;
+      String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
+      LOG.info(logStr);
+      status.setStatus(logStr);
+      if (cancel != null && !cancel.progress()) {
+        cancelled = true;
+        return new SplitWALResult(false, corrupt);
       }
-      logFileReader = getReader(logfile, skipErrors, reporter);
-      if (logFileReader == null) {
-        LOG.warn("Nothing to split in WAL={}", logPath);
-        return true;
+      walReader = getReader(walStatus, this.skipErrors, cancel);
+      if (walReader == null) {
+        LOG.warn("Nothing in {}; empty?", wal);
+        return new SplitWALResult(true, corrupt);
       }
-      long openCost = EnvironmentEdgeManager.currentTime() - startTS;
-      LOG.info("Open WAL={} cost {} ms", logPath, openCost);
+      LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
       int numOpenedFilesLastCheck = 0;
-      outputSink.setReporter(reporter);
+      outputSink.setReporter(cancel);
       outputSink.setStatus(status);
       outputSink.startWriterThreads();
       outputSinkStarted = true;
       Entry entry;
-      Long lastFlushedSequenceId = -1L;
       startTS = EnvironmentEdgeManager.currentTime();
-      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
+      while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
         byte[] region = entry.getKey().getEncodedRegionName();
         String encodedRegionNameAsStr = Bytes.toString(region);
-        lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
+        Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
         if (lastFlushedSequenceId == null) {
           if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
               encodedRegionNameAsStr))) {
@@ -301,8 +342,7 @@ public class WALSplitter {
             // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
             // will get skipped by the seqId check below.
             // See more details at https://issues.apache.org/jira/browse/HBASE-24189
-            LOG.info("{} no longer available in the FS. Skipping all edits for this region.",
-                encodedRegionNameAsStr);
+            LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
             lastFlushedSequenceId = Long.MAX_VALUE;
           } else {
             if (sequenceIdChecker != null) {
@@ -315,7 +355,7 @@ public class WALSplitter {
               regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
               if (LOG.isDebugEnabled()) {
-                LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+                LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
                     + TextFormat.shortDebugString(ids));
               }
             }
@@ -344,9 +384,9 @@ public class WALSplitter {
           String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
               + " edits, skipped " + editsSkipped + " edits.";
           status.setStatus("Split " + countsStr);
-          if (reporter != null && !reporter.progress()) {
-            progressFailed = true;
-            return false;
+          if (cancel != null && !cancel.progress()) {
+            cancelled = true;
+            return new SplitWALResult(false, corrupt);
           }
         }
       }
@@ -355,68 +395,64 @@ public class WALSplitter {
       iie.initCause(ie);
       throw iie;
     } catch (CorruptedLogFileException e) {
-      LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
-      if (splitLogWorkerCoordination != null) {
+      LOG.warn("Could not parse, corrupt WAL={}", wal, e);
+      // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
+      // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
+      if (this.splitLogWorkerCoordination != null) {
         // Some tests pass in a csm of null.
-        splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
-      } else {
-        // for tests only
-        ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
+        splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
       }
-      isCorrupted = true;
+      corrupt = true;
     } catch (IOException e) {
       e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
       throw e;
     } finally {
-      final String log = "Finishing writing output logs and closing down";
+      final String log = "Finishing writing output for " + wal + " so closing down";
       LOG.debug(log);
       status.setStatus(log);
       try {
-        if (null != logFileReader) {
-          logFileReader.close();
+        if (null != walReader) {
+          walReader.close();
         }
       } catch (IOException exception) {
-        LOG.warn("Could not close WAL reader", exception);
+        LOG.warn("Could not close {} reader", wal, exception);
       }
       try {
         if (outputSinkStarted) {
-          // Set progress_failed to true as the immediate following statement will reset its value
-          // when close() throws exception, progress_failed has the right value
-          progressFailed = true;
-          progressFailed = outputSink.close() == null;
+          // Set cancelled to true as the immediate following statement will reset its value.
+          // If close() throws an exception, cancelled will have the right value
+          cancelled = true;
+          cancelled = outputSink.close() == null;
         }
       } finally {
         long processCost = EnvironmentEdgeManager.currentTime() - startTS;
         // See if length got updated post lease recovery
         String msg = "Processed " + editsCount + " edits across " +
-            outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
-            " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
-            StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
-            ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
+          outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost +
+          " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr +
+          ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
         LOG.info(msg);
         status.markComplete(msg);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
-            status.prettyPrintJournal());
+          LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
         }
       }
     }
-    return !progressFailed;
+    return new SplitWALResult(!cancelled, corrupt);
   }
 
-  private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
-      throws IOException {
-    Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
-    return this.rootFS.exists(regionDirPath);
+  private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException {
+    return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region));
   }
 
   /**
    * Create a new {@link Reader} for reading logs to split.
+   * @return Returns null if file has length zero or file can't be found.
    */
-  private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
+  protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel)
       throws IOException, CorruptedLogFileException {
-    Path path = file.getPath();
-    long length = file.getLen();
+    Path path = walStatus.getPath();
+    long length = walStatus.getLen();
     Reader in;
 
     // Check for possibly empty file. With appends, currently Hadoop reports a
@@ -427,9 +463,9 @@ public class WALSplitter {
     }
 
     try {
-      RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter);
+      RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel);
       try {
-        in = getReader(path, reporter);
+        in = getReader(path, cancel);
       } catch (EOFException e) {
         if (length <= 0) {
           // TODO should we ignore an empty, not-last log file if skip.errors
@@ -451,8 +487,8 @@ public class WALSplitter {
       if (!skipErrors || e instanceof InterruptedIOException) {
         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
       }
-      throw new CorruptedLogFileException("skipErrors=true Could not open wal "
-        + path + " ignoring", e);
+      throw new CorruptedLogFileException("skipErrors=true; could not open " + path +
+        ", skipping", e);
     }
     return in;
   }
@@ -463,14 +499,14 @@ public class WALSplitter {
       return in.next();
     } catch (EOFException eof) {
       // truncated files are expected if a RS crashes (see HBASE-2643)
-      LOG.info("EOF from wal {}. Continuing.", path);
+      LOG.info("EOF from {}; continuing.", path);
       return null;
     } catch (IOException e) {
       // If the IOE resulted from bad file format,
       // then this problem is idempotent and retrying won't help
       if (e.getCause() != null && (e.getCause() instanceof ParseException
           || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
-        LOG.warn("Parse exception from wal {}. Continuing", path, e);
+        LOG.warn("Parse exception from {}; continuing", path, e);
         return null;
       }
       if (!skipErrors) {
@@ -493,7 +529,7 @@ public class WALSplitter {
    * Create a new {@link Reader} for reading logs to split.
    * @return new Reader instance, caller should close
    */
-  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
+  private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
     return walFactory.createReader(walFS, curLogFile, reporter);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 2b91ba9..24fdbc4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,42 +18,23 @@
 package org.apache.hadoop.hbase.master;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.LongAdder;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
@@ -65,9 +45,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -77,12 +55,10 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -93,7 +69,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
 /**
@@ -177,83 +152,6 @@ public abstract class AbstractTestDLS {
   }
 
   @Test
-  public void testRecoveredEdits() throws Exception {
-    conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
-    startCluster(NUM_RS);
-
-    int numLogLines = 10000;
-    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
-    // turn off load balancing to prevent regions from moving around otherwise
-    // they will consume recovered.edits
-    master.balanceSwitch(false);
-    FileSystem fs = master.getMasterFileSystem().getFileSystem();
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-
-    Path rootdir = CommonFSUtils.getRootDir(conf);
-
-    int numRegions = 50;
-    try (Table t = installTable(numRegions)) {
-      List<RegionInfo> regions = null;
-      HRegionServer hrs = null;
-      for (int i = 0; i < NUM_RS; i++) {
-        hrs = rsts.get(i).getRegionServer();
-        regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
-        // At least one RS will have >= to average number of regions.
-        if (regions.size() >= numRegions / NUM_RS) {
-          break;
-        }
-      }
-      Path logDir = new Path(rootdir,
-          AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
-
-      LOG.info("#regions = " + regions.size());
-      Iterator<RegionInfo> it = regions.iterator();
-      while (it.hasNext()) {
-        RegionInfo region = it.next();
-        if (region.getTable().getNamespaceAsString()
-            .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
-          it.remove();
-        }
-      }
-
-      makeWAL(hrs, regions, numLogLines, 100);
-
-      slm.splitLogDistributed(logDir);
-
-      int count = 0;
-      for (RegionInfo hri : regions) {
-        @SuppressWarnings("deprecation")
-        Path editsdir = WALSplitUtil
-            .getRegionDirRecoveredEditsDir(CommonFSUtils.getWALRegionDir(conf,
-                tableName, hri.getEncodedName()));
-        LOG.debug("Checking edits dir " + editsdir);
-        FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
-          @Override
-          public boolean accept(Path p) {
-            if (WALSplitUtil.isSequenceIdFile(p)) {
-              return false;
-            }
-            return true;
-          }
-        });
-        LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()).
-          collect(Collectors.joining(",")));
-        assertTrue("Edits dir should have more than a one file", files.length > 1);
-        for (int i = 0; i < files.length; i++) {
-          int c = countWAL(files[i].getPath(), fs, conf);
-          count += c;
-        }
-        LOG.info(count + " edits in " + files.length + " recovered edits files.");
-      }
-
-      // check that the log file is moved
-      assertFalse(fs.exists(logDir));
-      assertEquals(numLogLines, count);
-    }
-  }
-
-  @Test
   public void testMasterStartsUpWithLogSplittingWork() throws Exception {
     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
     startCluster(NUM_RS);
@@ -303,71 +201,6 @@ public abstract class AbstractTestDLS {
     }
   }
 
-  /**
-   * The original intention of this test was to force an abort of a region server and to make sure
-   * that the failure path in the region servers is properly evaluated. But it is difficult to
-   * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
-   * there is this code path where the master will preempt the region server when master detects
-   * that the region server has aborted.
-   * @throws Exception
-   */
-  // Was marked flaky before Distributed Log Replay cleanup.
-  @Test
-  public void testWorkerAbort() throws Exception {
-    LOG.info("testWorkerAbort");
-    startCluster(3);
-    int numLogLines = 10000;
-    SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
-    FileSystem fs = master.getMasterFileSystem().getFileSystem();
-
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-    HRegionServer hrs = findRSToKill(false);
-    Path rootdir = CommonFSUtils.getRootDir(conf);
-    final Path logDir = new Path(rootdir,
-        AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
-
-    try (Table t = installTable(40)) {
-      makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
-
-      new Thread() {
-        @Override
-        public void run() {
-          try {
-            waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
-          } catch (InterruptedException e) {
-          }
-          for (RegionServerThread rst : rsts) {
-            rst.getRegionServer().abort("testing");
-            break;
-          }
-        }
-      }.start();
-      FileStatus[] logfiles = fs.listStatus(logDir);
-      TaskBatch batch = new TaskBatch();
-      slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
-      // waitForCounter but for one of the 2 counters
-      long curt = System.currentTimeMillis();
-      long waitTime = 80000;
-      long endt = curt + waitTime;
-      while (curt < endt) {
-        if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
-            tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
-            tot_wkr_preempt_task.sum()) == 0) {
-          Thread.sleep(100);
-          curt = System.currentTimeMillis();
-        } else {
-          assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
-              tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
-              tot_wkr_preempt_task.sum()));
-          return;
-        }
-      }
-      fail("none of the following counters went up in " + waitTime + " milliseconds - " +
-          "tot_wkr_task_resigned, tot_wkr_task_err, " +
-          "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
-    }
-  }
-
   @Test
   public void testThreeRSAbort() throws Exception {
     LOG.info("testThreeRSAbort");
@@ -409,75 +242,6 @@ public abstract class AbstractTestDLS {
     }
   }
 
-  @Test
-  public void testDelayedDeleteOnFailure() throws Exception {
-    LOG.info("testDelayedDeleteOnFailure");
-    startCluster(1);
-    final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
-    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
-    final Path logDir =
-      new Path(new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
-        ServerName.valueOf("x", 1, 1).toString());
-    fs.mkdirs(logDir);
-    ExecutorService executor = null;
-    try {
-      final Path corruptedLogFile = new Path(logDir, "x");
-      FSDataOutputStream out;
-      out = fs.create(corruptedLogFile);
-      out.write(0);
-      out.write(Bytes.toBytes("corrupted bytes"));
-      out.close();
-      ZKSplitLogManagerCoordination coordination =
-          (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
-              .getSplitLogManagerCoordination();
-      coordination.setIgnoreDeleteForTesting(true);
-      executor = Executors.newSingleThreadExecutor();
-      Runnable runnable = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            // since the logDir is a fake, corrupted one, so the split log worker
-            // will finish it quickly with error, and this call will fail and throw
-            // an IOException.
-            slm.splitLogDistributed(logDir);
-          } catch (IOException ioe) {
-            try {
-              assertTrue(fs.exists(corruptedLogFile));
-              // this call will block waiting for the task to be removed from the
-              // tasks map which is not going to happen since ignoreZKDeleteForTesting
-              // is set to true, until it is interrupted.
-              slm.splitLogDistributed(logDir);
-            } catch (IOException e) {
-              assertTrue(Thread.currentThread().isInterrupted());
-              return;
-            }
-            fail("did not get the expected IOException from the 2nd call");
-          }
-          fail("did not get the expected IOException from the 1st call");
-        }
-      };
-      Future<?> result = executor.submit(runnable);
-      try {
-        result.get(2000, TimeUnit.MILLISECONDS);
-      } catch (TimeoutException te) {
-        // it is ok, expected.
-      }
-      waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
-      executor.shutdownNow();
-      executor = null;
-
-      // make sure the runnable is finished with no exception thrown.
-      result.get();
-    } finally {
-      if (executor != null) {
-        // interrupt the thread in case the test fails in the middle.
-        // it has no effect if the thread is already terminated.
-        executor.shutdownNow();
-      }
-      fs.delete(logDir, true);
-    }
-  }
-
   private Table installTable(int nrs) throws Exception {
     return installTable(nrs, 0);
   }
@@ -504,8 +268,9 @@ public abstract class AbstractTestDLS {
     NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
     LOG.debug("Verifying only catalog and namespace regions are assigned\n");
     if (regions.size() != 2) {
-      for (String oregion : regions)
+      for (String oregion : regions) {
         LOG.debug("Region still online: " + oregion);
+      }
     }
     assertEquals(2 + existingRegions, regions.size());
     LOG.debug("Enabling table\n");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
index 89c422d..acf85a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.master.assignment;
-
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
 import static org.mockito.ArgumentMatchers.any;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.MasterWalManager;
 import org.apache.hadoop.hbase.master.MockNoopMasterServices;
 import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.SplitWALManager;
 import org.apache.hadoop.hbase.master.TableStateManager;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
@@ -61,9 +62,7 @@ import org.apache.zookeeper.KeeperException;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -81,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrEx
 public class MockMasterServices extends MockNoopMasterServices {
   private final MasterFileSystem fileSystemManager;
   private final MasterWalManager walManager;
+  private final SplitWALManager splitWALManager;
   private final AssignmentManager assignmentManager;
   private final TableStateManager tableStateManager;
 
@@ -102,6 +102,10 @@ public class MockMasterServices extends MockNoopMasterServices {
     Superusers.initialize(conf);
     this.fileSystemManager = new MasterFileSystem(conf);
     this.walManager = new MasterWalManager(this);
+    this.splitWALManager =
+      conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)?
+        null: new SplitWALManager(this);
+
     // Mock an AM.
     this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {
       @Override
@@ -146,8 +150,8 @@ public class MockMasterServices extends MockNoopMasterServices {
     // to make our test work.
     this.connection =
         HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
-          Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
-          RegionInfoBuilder.FIRST_META_REGIONINFO);
+          Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri,
+          MOCK_MASTER_SERVERNAME, RegionInfoBuilder.FIRST_META_REGIONINFO);
     // Set hbase.rootdir into test dir.
     Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
     CommonFSUtils.setRootDir(getConfiguration(), rootdir);
@@ -366,4 +370,8 @@ public class MockMasterServices extends MockNoopMasterServices {
     }
     return builder.build();
   }
+
+  @Override public SplitWALManager getSplitWALManager() {
+    return splitWALManager;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java
index fd1e533..63b611d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java
@@ -77,7 +77,7 @@ public class TestCleanupMetaWAL {
     Path walPath = new Path(fs.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME);
     for (FileStatus status : CommonFSUtils.listStatus(fs.getFileSystem(), walPath)) {
       if (status.getPath().toString().contains(SPLITTING_EXT)) {
-        fail("Should not have splitting wal dir here:" + status);
+        fail("Splitting WAL dir should have been cleaned up: " + status);
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index 7e93932..f36c5b9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.wal;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -84,7 +83,7 @@ public class TestWALReaderOnSecureWAL {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
     conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
-    conf.setBoolean("hbase.hlog.split.skip.errors", true);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
     conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
     CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
   }
@@ -168,21 +167,14 @@ public class TestWALReaderOnSecureWAL {
       wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
       assertFalse(true);
     } catch (IOException ioe) {
-      // expected IOE
+      System.out.println("Expected ioe " + ioe.getMessage());
     }
 
     FileStatus[] listStatus = fs.listStatus(walPath.getParent());
     Path rootdir = CommonFSUtils.getRootDir(conf);
-    try {
-      WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
-      s.splitLogFile(listStatus[0], null);
-      Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
-        "corrupt");
-      assertTrue(fs.exists(file));
-      // assertFalse("log splitting should have failed", true);
-    } catch (IOException ioe) {
-      assertTrue("WAL should have been sidelined", false);
-    }
+    WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
+    WALSplitter.SplitWALResult swr = s.splitWAL(listStatus[0], null);
+    assertTrue(swr.isCorrupt());
     wals.close();
   }
 
@@ -219,7 +211,7 @@ public class TestWALReaderOnSecureWAL {
     Path rootdir = CommonFSUtils.getRootDir(conf);
     try {
       WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
-      s.splitLogFile(listStatus[0], null);
+      s.splitWAL(listStatus[0], null);
       Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
         "corrupt");
       assertTrue(!fs.exists(file));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 43cf81f..5f22b45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
@@ -91,12 +91,10 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
@@ -105,18 +103,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
  */
 @Category({RegionServerTests.class, LargeTests.class})
 public class TestWALSplit {
-
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestWALSplit.class);
-
-  {
-    // Uncomment the following lines if more verbosity is needed for
-    // debugging (see HBASE-12285 for details).
-    //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
-    //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
-    //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
-  }
   private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
 
   private static Configuration conf;
@@ -143,7 +132,6 @@ public class TestWALSplit {
   private static final byte[] VALUE = Bytes.toBytes("v1");
   private static final String WAL_FILE_PREFIX = "wal.dat.";
   private static List<String> REGIONS = new ArrayList<>();
-  private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
   private static String ROBBER;
   private static String ZOMBIE;
   private static String [] GROUP = new String [] {"supergroup"};
@@ -223,8 +211,6 @@ public class TestWALSplit {
   /**
    * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
    * Ensures we do not lose edits.
-   * @throws IOException
-   * @throws InterruptedException
    */
   @Test
   public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
@@ -553,7 +539,7 @@ public class TestWALSplit {
 
   @Test
   public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
     generateWALs(Integer.MAX_VALUE);
     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
         Corruptions.APPEND_GARBAGE, true);
@@ -562,7 +548,7 @@ public class TestWALSplit {
 
   @Test
   public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
     generateWALs(Integer.MAX_VALUE);
     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
@@ -571,7 +557,7 @@ public class TestWALSplit {
 
   @Test
   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
     generateWALs(Integer.MAX_VALUE);
     corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
         Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
@@ -587,7 +573,7 @@ public class TestWALSplit {
 
   @Test
   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
     List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
         .asList(FaultyProtobufLogReader.FailureType.values()).stream()
         .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
@@ -654,14 +640,14 @@ public class TestWALSplit {
   @Test (expected = IOException.class)
   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
       throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
     splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
   }
 
   @Test
   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
       throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
     try {
       splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
     } catch (IOException e) {
@@ -673,7 +659,7 @@ public class TestWALSplit {
 
   private void ignoreCorruption(final Corruptions corruption, final int entryCount,
       final int expectedCount) throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
 
     final String REGION = "region__1";
     REGIONS.clear();
@@ -698,7 +684,7 @@ public class TestWALSplit {
     in.close();
 
     // should not have stored the EOF files as corrupt
-    FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
+    FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0];
     assertEquals(0, archivedLogs.length);
 
   }
@@ -717,7 +703,7 @@ public class TestWALSplit {
 
   @Test
   public void testLogsGetArchivedAfterSplit() throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
     generateWALs(-1);
     useDifferentDFSClient();
     WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
@@ -793,7 +779,7 @@ public class TestWALSplit {
 
   @Test
   public void testIOEOnOutputThread() throws Exception {
-    conf.setBoolean(HBASE_SKIP_ERRORS, false);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
 
     generateWALs(-1);
     useDifferentDFSClient();
@@ -847,7 +833,7 @@ public class TestWALSplit {
     t.setDaemon(true);
     t.start();
     try {
-      logSplitter.splitLogFile(logfiles[largestLogFile], null);
+      logSplitter.splitWAL(logfiles[largestLogFile], null);
       fail("Didn't throw!");
     } catch (IOException ioe) {
       assertTrue(ioe.toString().contains("Injected"));
@@ -944,7 +930,7 @@ public class TestWALSplit {
     try {
       conf.setInt("hbase.splitlog.report.period", 1000);
       boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
-        null, wals, null);
+        Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
       assertFalse("Log splitting should failed", ret);
       assertTrue(count.get() > 0);
     } catch (IOException e) {
@@ -1002,9 +988,7 @@ public class TestWALSplit {
     makeRegionDirs(regions);
 
     // Create a splitter that reads and writes the data without touching disk
-    WALSplitter logSplitter =
-        new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
-
+    WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
       /* Produce a mock writer that doesn't write anywhere */
       @Override
       protected Writer createWriter(Path logfile) throws IOException {
@@ -1039,8 +1023,8 @@ public class TestWALSplit {
 
       /* Produce a mock reader that generates fake entries */
       @Override
-      protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
-          throws IOException {
+      protected Reader getReader(FileStatus file, boolean skipErrors,
+        CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
         Reader mockReader = Mockito.mock(Reader.class);
         Mockito.doAnswer(new Answer<Entry>() {
           int index = 0;
@@ -1064,7 +1048,7 @@ public class TestWALSplit {
       }
     };
 
-    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
+    logSplitter.splitWAL(fs.getFileStatus(logPath), null);
 
     // Verify number of written edits per region
     Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
@@ -1119,7 +1103,7 @@ public class TestWALSplit {
   @Test
   public void testSplitLogFileFirstLineCorruptionLog()
       throws IOException {
-    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
     generateWALs(1, 10, -1);
     FileStatus logfile = fs.listStatus(WALDIR)[0];
 
@@ -1175,7 +1159,7 @@ public class TestWALSplit {
       }
     };
     try{
-      logSplitter.splitLogFile(logfiles[0], null);
+      logSplitter.splitWAL(logfiles[0], null);
     } catch (IOException e) {
       LOG.info(e.toString(), e);
       fail("Throws IOException when spliting "
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index c7b45fe..bb02af3 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -221,8 +221,8 @@ public final class MetaTableLocator {
       LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
       return;
     }
-    LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}", replicaId,
-      serverName);
+    LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId,
+      serverName, state);
     // Make the MetaRegionServer pb and then get its bytes and save this as
     // the znode content.
     MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index f933070..63e2857 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -32,7 +32,10 @@ import org.slf4j.LoggerFactory;
 /**
  * Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed
  * splitting of WAL logs.
+ * @deprecated  since 2.4.0 and 3.0.0 replaced by procedure-based WAL splitting; see
+ *    SplitWALManager.
  */
+@Deprecated
 @InterfaceAudience.Private
 public final class ZKSplitLog {
   private static final Logger LOG = LoggerFactory.getLogger(ZKSplitLog.class);