You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ti...@apache.org on 2019/05/30 02:50:53 UTC

[hbase] branch branch-2 updated: HBASE-22454 refactor WALSplitter

This is an automated email from the ASF dual-hosted git repository.

tianjy pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 34514cc  HBASE-22454 refactor WALSplitter
34514cc is described below

commit 34514ccf0ab29b921044d18d751e8507d6ee23dc
Author: Jingyun Tian <ti...@gmail.com>
AuthorDate: Wed May 29 11:07:52 2019 +0800

    HBASE-22454 refactor WALSplitter
---
 .../ZKSplitLogManagerCoordination.java             |    4 +-
 .../master/assignment/AssignmentManagerUtil.java   |    4 +-
 .../assignment/MergeTableRegionsProcedure.java     |    6 +-
 .../hbase/master/assignment/RegionStateStore.java  |    4 +-
 .../assignment/SplitTableRegionProcedure.java      |    8 +-
 .../master/procedure/DisableTableProcedure.java    |    4 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   25 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   15 +-
 .../RegionReplicaReplicationEndpoint.java          |    4 +-
 .../org/apache/hadoop/hbase/util/HBaseFsck.java    |    4 +-
 .../wal/BoundedLogWriterCreationOutputSink.java    |  151 +++
 .../org/apache/hadoop/hbase/wal/EntryBuffers.java  |  158 +++
 .../hbase/wal/LogRecoveredEditsOutputSink.java     |  460 +++++++
 .../org/apache/hadoop/hbase/wal/OutputSink.java    |  252 ++++
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  |  523 +++++++
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   | 1423 +-------------------
 .../hadoop/hbase/master/AbstractTestDLS.java       |    6 +-
 .../TestDeleteColumnFamilyProcedureFromClient.java |    6 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     |   14 +-
 .../regionserver/TestHRegionReplayEvents.java      |    2 +-
 .../hbase/regionserver/TestRecoveredEdits.java     |    4 +-
 .../TestRecoveredEditsReplayAndAbort.java          |    4 +-
 .../regionserver/wal/AbstractTestWALReplay.java    |   14 +-
 .../hbase/snapshot/TestRestoreSnapshotHelper.java  |    6 +-
 .../hadoop/hbase/wal/TestReadWriteSeqIdFiles.java  |   16 +-
 .../apache/hadoop/hbase/wal/TestWALMethods.java    |   27 +-
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |   14 +-
 27 files changed, 1680 insertions(+), 1478 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index fac1532..3ff722a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@@ -92,7 +92,7 @@ public class ZKSplitLogManagerCoordination extends ZKListener implements
       @Override
       public Status finish(ServerName workerName, String logfile) {
         try {
-          WALSplitter.finishSplitLogFile(logfile, conf);
+          WALSplitUtil.finishSplitLogFile(logfile, conf);
         } catch (IOException e) {
           LOG.warn("Could not finish splitting of log file " + logfile, e);
           return Status.ERR;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
index 4f9343c..d401141 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.favored.FavoredNodesManager;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@@ -229,7 +229,7 @@ final class AssignmentManagerUtil {
   }
 
   static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException {
-    if (WALSplitter.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
+    if (WALSplitUtil.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
       throw new IOException("Recovered.edits are found in Region: " + regionInfo +
         ", abort split/merge to prevent data loss");
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 241e8f9..11ae8fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -777,11 +777,11 @@ public class MergeTableRegionsProcedure
     long maxSequenceId = -1L;
     for (RegionInfo region : regionsToMerge) {
       maxSequenceId =
-        Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
+        Math.max(maxSequenceId, WALSplitUtil.getMaxRegionSequenceId(
             walFS, getWALRegionDir(env, region)));
     }
     if (maxSequenceId > 0) {
-      WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
+      WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
           maxSequenceId);
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index ce4bc38..a8491fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -219,7 +219,7 @@ public class RegionStateStore {
   private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
     FileSystem walFS = master.getMasterWalManager().getFileSystem();
     long maxSeqId =
-        WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
+        WALSplitUtil.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
             master.getConfiguration(), region.getTable(), region.getEncodedName()));
     return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 657f397..346905a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -68,7 +68,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -876,11 +876,11 @@ public class SplitTableRegionProcedure
   private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
     FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
     long maxSequenceId =
-      WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
+      WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
     if (maxSequenceId > 0) {
-      WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI),
+      WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI),
           maxSequenceId);
-      WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI),
+      WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI),
           maxSequenceId);
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index 9cacc5d..85a8226 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.TableStateManager;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ public class DisableTableProcedure
               for (RegionInfo region : env.getAssignmentManager().getRegionStates()
                 .getRegionsOfTable(tableName)) {
                 long maxSequenceId =
-                  WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
+                    WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
                 long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
                 mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
                   EnvironmentEdgeManager.currentTime()));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 24bafab..6e14873 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -174,8 +174,8 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.core.TraceScope;
@@ -1009,15 +1009,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // (particularly if no recovered edits, seqid will be -1).
     long nextSeqId = maxSeqId + 1;
     if (!isRestoredRegion) {
-      long maxSeqIdFromFile =
-          WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
+      long maxSeqIdFromFile = WALSplitUtil.getMaxRegionSequenceId(getWalFileSystem(),
+        getWALRegionDirOfDefaultReplica());
       nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
       // The openSeqNum will always be increase even for read only region, as we rely on it to
       // determine whether a region has been successfully reopend, so here we always need to update
       // the max sequence id file.
       if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
         LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
-        WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
+        WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
+          nextSeqId - 1);
       }
     }
 
@@ -1174,7 +1175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // checking region folder exists is due to many tests which delete the table folder while a
     // table is still online
     if (getWalFileSystem().exists(getWALRegionDir())) {
-      WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
+      WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
         mvcc.getReadPoint());
     }
   }
@@ -4586,13 +4587,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     NavigableSet<Path> filesUnderRootDir = null;
     if (!regionDir.equals(defaultRegionDir)) {
       filesUnderRootDir =
-          WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
+          WALSplitUtil.getSplitEditFilesSorted(rootFS, defaultRegionDir);
       seqid = Math.max(seqid,
           replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
               defaultRegionDir));
     }
 
-    NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
+    NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionDir);
     seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
         files, reporter, regionDir));
 
@@ -4605,7 +4606,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // For debugging data loss issues!
       // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
       // column family. Have to fake out file type too by casting our recovered.edits as storefiles
-      String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
+      String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir).getName();
       Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
       for (Path file: files) {
         fakeStoreFiles.add(
@@ -4682,7 +4683,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                   HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
         }
         if (skipErrors) {
-          Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+          Path p = WALSplitUtil.moveAsideBadEditsFile(fs, edits);
           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
               + "=true so continuing. Renamed " + edits +
               " as " + p, e);
@@ -4862,7 +4863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
         }
       } catch (EOFException eof) {
-        Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
+        Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
         msg = "EnLongAddered EOF. Most likely due to Master failure during " +
             "wal splitting, so we have this data in another edit.  " +
             "Continuing, but renaming " + edits + " as " + p;
@@ -4872,7 +4873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // If the IOE resulted from bad file format,
         // then this problem is idempotent and retrying won't help
         if (ioe.getCause() instanceof ParseException) {
-          Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
+          Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
           msg = "File corruption enLongAddered!  " +
               "Continuing, but renaming " + edits + " as " + p;
           LOG.warn(msg, ioe);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index adfa84f..64ecbc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -140,7 +140,8 @@ import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -1109,14 +1110,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @throws IOException
    */
   private OperationStatus [] doReplayBatchOp(final HRegion region,
-      final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
+      final List<MutationReplay> mutations, long replaySeqId) throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
-      for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
-        WALSplitter.MutationReplay m = it.next();
+      for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
+        MutationReplay m = it.next();
 
-        if (m.type == MutationType.PUT) {
+        if (m.getType() == MutationType.PUT) {
           batchContainsPuts = true;
         } else {
           batchContainsDelete = true;
@@ -1160,7 +1161,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         regionServer.cacheFlusher.reclaimMemStoreMemory();
       }
       return region.batchReplay(mutations.toArray(
-        new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
+        new MutationReplay[mutations.size()]), replaySeqId);
     } finally {
       updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
     }
@@ -2217,7 +2218,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               entry.getKey().getWriteTime());
         }
         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
-        List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
+        List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry,
           cells, walEntry, durability);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index f7721e0..d1498a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.EntryBuffers;
+import org.apache.hadoop.hbase.wal.OutputSink;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
 import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
 import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index a5c1fce..f749061 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -133,7 +133,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
 import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -4463,7 +4463,7 @@ public class HBaseFsck extends Configured implements Closeable {
                   // This is special case if a region is left after split
                   he.hdfsOnlyEdits = true;
                   FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
-                  Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath());
+                  Path ePath = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
                   for (FileStatus subDir : subDirs) {
                     errors.progress();
                     String sdName = subDir.getPath().getName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
new file mode 100644
index 0000000..5fa7bef
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ * Bounded means the output streams will be no more than the size of threadpool
+ */
+@InterfaceAudience.Private
+public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
+
+  private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
+
+  public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
+      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+    super(walSplitter, controller, entryBuffers, numWriters);
+  }
+
+  @Override
+  public List<Path> finishWritingAndClose() throws IOException {
+    boolean isSuccessful;
+    List<Path> result;
+    try {
+      isSuccessful = finishWriting(false);
+    } finally {
+      result = close();
+    }
+    if (isSuccessful) {
+      splits = result;
+    }
+    return splits;
+  }
+
+  @Override
+  boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
+      List<Path> paths) throws InterruptedException, ExecutionException {
+    for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
+        .entrySet()) {
+      LOG.info("Submitting writeThenClose of {}",
+        Arrays.toString(buffer.getValue().encodedRegionName));
+      completionService.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          Path dst = writeThenClose(buffer.getValue());
+          paths.add(dst);
+          return null;
+        }
+      });
+    }
+    boolean progress_failed = false;
+    for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+      Future<Void> future = completionService.take();
+      future.get();
+      if (!progress_failed && reporter != null && !reporter.progress()) {
+        progress_failed = true;
+      }
+    }
+
+    return progress_failed;
+  }
+
+  /**
+   * since the splitting process may create multiple output files, we need a map
+   * regionRecoverStatMap to track the output count of each region.
+   * @return a map from encoded region ID to the number of edits written out for that region.
+   */
+  @Override
+  public Map<byte[], Long> getOutputCounts() {
+    Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
+    for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
+      regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
+    }
+    return regionRecoverStatMapResult;
+  }
+
+  /**
+   * @return the number of recovered regions
+   */
+  @Override
+  public int getNumberOfRecoveredRegions() {
+    return regionRecoverStatMap.size();
+  }
+
+  /**
+   * Append the buffer to a new recovered edits file, then close it after all done
+   * @param buffer contain all entries of a certain region
+   * @throws IOException when closeWriter failed
+   */
+  @Override
+  public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+    writeThenClose(buffer);
+  }
+
+  private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+    WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
+    if (wap != null) {
+      String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
+      Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
+      if (value != null) {
+        Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
+        regionRecoverStatMap.put(encodedRegionName, newValue);
+      }
+    }
+
+    Path dst = null;
+    List<IOException> thrown = new ArrayList<>();
+    if (wap != null) {
+      dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
+    }
+    if (!thrown.isEmpty()) {
+      throw MultipleIOException.createIOException(thrown);
+    }
+    return dst;
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
new file mode 100644
index 0000000..f0974be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class which accumulates edits and separates them into a buffer per region while simultaneously
+ * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
+ * pull region-specific buffers from this class.
+ */
+@InterfaceAudience.Private
+public class EntryBuffers {
+  private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
+
+  PipelineController controller;
+
+  Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  /*
+   * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
+   * up bytes from a region if we're already writing data for that region in a different IO thread.
+   */
+  Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+
+  long totalBuffered = 0;
+  long maxHeapUsage;
+  boolean splitWriterCreationBounded;
+
+  public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+    this(controller, maxHeapUsage, false);
+  }
+
+  public EntryBuffers(PipelineController controller, long maxHeapUsage,
+      boolean splitWriterCreationBounded) {
+    this.controller = controller;
+    this.maxHeapUsage = maxHeapUsage;
+    this.splitWriterCreationBounded = splitWriterCreationBounded;
+  }
+
+  /**
+   * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
+   * crossed the specified threshold.
+   */
+  public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
+    WALKey key = entry.getKey();
+    RegionEntryBuffer buffer;
+    long incrHeap;
+    synchronized (this) {
+      buffer = buffers.get(key.getEncodedRegionName());
+      if (buffer == null) {
+        buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
+        buffers.put(key.getEncodedRegionName(), buffer);
+      }
+      incrHeap = buffer.appendEntry(entry);
+    }
+
+    // If we crossed the chunk threshold, wait for more space to be available
+    synchronized (controller.dataAvailable) {
+      totalBuffered += incrHeap;
+      while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
+        LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
+        controller.dataAvailable.wait(2000);
+      }
+      controller.dataAvailable.notifyAll();
+    }
+    controller.checkForErrors();
+  }
+
+  /**
+   * @return RegionEntryBuffer a buffer of edits to be written.
+   */
+  synchronized RegionEntryBuffer getChunkToWrite() {
+    // The core part of limiting opening writers is it doesn't return chunk only if the
+    // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
+    // region during splitting. It will flush all the logs in the buffer after splitting
+    // through a threadpool, which means the number of writers it created is under control.
+    if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
+      return null;
+    }
+    long biggestSize = 0;
+    byte[] biggestBufferKey = null;
+
+    for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
+      long size = entry.getValue().heapSize();
+      if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
+        biggestSize = size;
+        biggestBufferKey = entry.getKey();
+      }
+    }
+    if (biggestBufferKey == null) {
+      return null;
+    }
+
+    RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
+    currentlyWriting.add(biggestBufferKey);
+    return buffer;
+  }
+
+  void doneWriting(RegionEntryBuffer buffer) {
+    synchronized (this) {
+      boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
+      assert removed;
+    }
+    long size = buffer.heapSize();
+
+    synchronized (controller.dataAvailable) {
+      totalBuffered -= size;
+      // We may unblock writers
+      controller.dataAvailable.notifyAll();
+    }
+  }
+
+  synchronized boolean isRegionCurrentlyWriting(byte[] region) {
+    return currentlyWriting.contains(region);
+  }
+
+  public void waitUntilDrained() {
+    synchronized (controller.dataAvailable) {
+      while (totalBuffered > 0) {
+        try {
+          controller.dataAvailable.wait(2000);
+        } catch (InterruptedException e) {
+          LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
+          Thread.interrupted();
+          break;
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
new file mode 100644
index 0000000..aa649e4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ */
+@InterfaceAudience.Private
+public class LogRecoveredEditsOutputSink extends OutputSink {
+  private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
+  private WALSplitter walSplitter;
+  private FileSystem walFS;
+  private Configuration conf;
+
+  public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
+      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+    // More threads could potentially write faster at the expense
+    // of causing more disk seeks as the logs are split.
+    // 3. After a certain setting (probably around 3) the
+    // process will be bound on the reader in the current
+    // implementation anyway.
+    super(controller, entryBuffers, numWriters);
+    this.walSplitter = walSplitter;
+    this.walFS = walSplitter.walFS;
+    this.conf = walSplitter.conf;
+  }
+
+  /**
+   * @return null if failed to report progress
+   */
+  @Override
+  public List<Path> finishWritingAndClose() throws IOException {
+    boolean isSuccessful = false;
+    List<Path> result = null;
+    try {
+      isSuccessful = finishWriting(false);
+    } finally {
+      result = close();
+      List<IOException> thrown = closeLogWriters(null);
+      if (CollectionUtils.isNotEmpty(thrown)) {
+        throw MultipleIOException.createIOException(thrown);
+      }
+    }
+    if (isSuccessful) {
+      splits = result;
+    }
+    return splits;
+  }
+
+  // delete the one with fewer wal entries
+  private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
+      throws IOException {
+    long dstMinLogSeqNum = -1L;
+    try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+      WAL.Entry entry = reader.next();
+      if (entry != null) {
+        dstMinLogSeqNum = entry.getKey().getSequenceId();
+      }
+    } catch (EOFException e) {
+      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
+        e);
+    }
+    if (wap.minLogSeqNum < dstMinLogSeqNum) {
+      LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+          + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+          + walFS.getFileStatus(dst).getLen());
+      if (!walFS.delete(dst, false)) {
+        LOG.warn("Failed deleting of old {}", dst);
+        throw new IOException("Failed deleting of old " + dst);
+      }
+    } else {
+      LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
+          + ", length=" + walFS.getFileStatus(wap.path).getLen());
+      if (!walFS.delete(wap.path, false)) {
+        LOG.warn("Failed deleting of {}", wap.path);
+        throw new IOException("Failed deleting of " + wap.path);
+      }
+    }
+  }
+
+  /**
+   * Close all of the output streams.
+   * @return the list of paths written.
+   */
+  List<Path> close() throws IOException {
+    Preconditions.checkState(!closeAndCleanCompleted);
+
+    final List<Path> paths = new ArrayList<>();
+    final List<IOException> thrown = Lists.newArrayList();
+    ThreadPoolExecutor closeThreadPool =
+        Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
+          private int count = 1;
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, "split-log-closeStream-" + count++);
+            return t;
+          }
+        });
+    CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
+    boolean progress_failed;
+    try {
+      progress_failed = executeCloseTask(completionService, thrown, paths);
+    } catch (InterruptedException e) {
+      IOException iie = new InterruptedIOException();
+      iie.initCause(e);
+      throw iie;
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      closeThreadPool.shutdownNow();
+    }
+    if (!thrown.isEmpty()) {
+      throw MultipleIOException.createIOException(thrown);
+    }
+    writersClosed = true;
+    closeAndCleanCompleted = true;
+    if (progress_failed) {
+      return null;
+    }
+    return paths;
+  }
+
+  /**
+   * @param completionService threadPool to execute the closing tasks
+   * @param thrown store the exceptions
+   * @param paths arrayList to store the paths written
+   * @return if close tasks executed successful
+   */
+  boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
+      List<Path> paths) throws InterruptedException, ExecutionException {
+    for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(
+          "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
+      }
+      completionService.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
+          Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
+          paths.add(dst);
+          return null;
+        }
+      });
+    }
+    boolean progress_failed = false;
+    for (int i = 0, n = this.writers.size(); i < n; i++) {
+      Future<Void> future = completionService.take();
+      future.get();
+      if (!progress_failed && reporter != null && !reporter.progress()) {
+        progress_failed = true;
+      }
+    }
+    return progress_failed;
+  }
+
+  Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
+      List<IOException> thrown) throws IOException {
+    LOG.trace("Closing {}", wap.path);
+    try {
+      wap.writer.close();
+    } catch (IOException ioe) {
+      LOG.error("Could not close log at {}", wap.path, ioe);
+      thrown.add(ioe);
+      return null;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
+          + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
+    }
+    if (wap.editsWritten == 0) {
+      // just remove the empty recovered.edits file
+      if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
+        LOG.warn("Failed deleting empty {}", wap.path);
+        throw new IOException("Failed deleting empty  " + wap.path);
+      }
+      return null;
+    }
+
+    Path dst = getCompletedRecoveredEditsFilePath(wap.path,
+      regionMaximumEditLogSeqNum.get(encodedRegionName));
+    try {
+      if (!dst.equals(wap.path) && walFS.exists(dst)) {
+        deleteOneWithFewerEntries(wap, dst);
+      }
+      // Skip the unit tests which create a splitter that reads and
+      // writes the data without touching disk.
+      // TestHLogSplit#testThreading is an example.
+      if (walFS.exists(wap.path)) {
+        if (!walFS.rename(wap.path, dst)) {
+          throw new IOException("Failed renaming " + wap.path + " to " + dst);
+        }
+        LOG.info("Rename {} to {}", wap.path, dst);
+      }
+    } catch (IOException ioe) {
+      LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
+      thrown.add(ioe);
+      return null;
+    }
+    return dst;
+  }
+
+  private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
+    if (writersClosed) {
+      return thrown;
+    }
+    if (thrown == null) {
+      thrown = Lists.newArrayList();
+    }
+    try {
+      for (WriterThread writerThread : writerThreads) {
+        while (writerThread.isAlive()) {
+          writerThread.setShouldStop(true);
+          writerThread.interrupt();
+          try {
+            writerThread.join(10);
+          } catch (InterruptedException e) {
+            IOException iie = new InterruptedIOException();
+            iie.initCause(e);
+            throw iie;
+          }
+        }
+      }
+    } finally {
+      WALSplitter.WriterAndPath wap = null;
+      for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
+        try {
+          wap = (WALSplitter.WriterAndPath) tmpWAP;
+          wap.writer.close();
+        } catch (IOException ioe) {
+          LOG.error("Couldn't close log at {}", wap.path, ioe);
+          thrown.add(ioe);
+          continue;
+        }
+        LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
+            + (wap.nanosSpent / 1000 / 1000) + "ms)");
+      }
+      writersClosed = true;
+    }
+
+    return thrown;
+  }
+
+  /**
+   * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+   * long as multiple threads are always acting on different regions.
+   * @return null if this region shouldn't output any logs
+   */
+  WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
+    byte[] region = entry.getKey().getEncodedRegionName();
+    String regionName = Bytes.toString(region);
+    WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
+    if (ret != null) {
+      return ret;
+    }
+    // If we already decided that this region doesn't get any output
+    // we don't need to check again.
+    if (blacklistedRegions.contains(region)) {
+      return null;
+    }
+    ret = createWAP(region, entry);
+    if (ret == null) {
+      blacklistedRegions.add(region);
+      return null;
+    }
+    if (reusable) {
+      writers.put(regionName, ret);
+    }
+    return ret;
+  }
+
+  /**
+   * @return a path with a write for that path. caller should close.
+   */
+  WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
+    String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path regionedits = getRegionSplitEditsPath(entry,
+      walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
+    if (regionedits == null) {
+      return null;
+    }
+    FileSystem walFs = FSUtils.getWALFileSystem(conf);
+    if (walFs.exists(regionedits)) {
+      LOG.warn("Found old edits file. It could be the "
+          + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+          + walFs.getFileStatus(regionedits).getLen());
+      if (!walFs.delete(regionedits, false)) {
+        LOG.warn("Failed delete of old {}", regionedits);
+      }
+    }
+    WALProvider.Writer w = walSplitter.createWriter(regionedits);
+    LOG.debug("Creating writer path={}", regionedits);
+    return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
+  }
+
+
+
+  void filterCellByStore(WAL.Entry logEntry) {
+    Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
+        .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+    if (MapUtils.isEmpty(maxSeqIdInStores)) {
+      return;
+    }
+    // Create the array list for the cells that aren't filtered.
+    // We make the assumption that most cells will be kept.
+    ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
+    for (Cell cell : logEntry.getEdit().getCells()) {
+      if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+        keptCells.add(cell);
+      } else {
+        byte[] family = CellUtil.cloneFamily(cell);
+        Long maxSeqId = maxSeqIdInStores.get(family);
+        // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
+        // or the master was crashed before and we can not get the information.
+        if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
+          keptCells.add(cell);
+        }
+      }
+    }
+
+    // Anything in the keptCells array list is still live.
+    // So rather than removing the cells from the array list
+    // which would be an O(n^2) operation, we just replace the list
+    logEntry.getEdit().setCells(keptCells);
+  }
+
+  @Override
+  public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+    appendBuffer(buffer, true);
+  }
+
+  WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
+      throws IOException {
+    List<WAL.Entry> entries = buffer.entryBuffer;
+    if (entries.isEmpty()) {
+      LOG.warn("got an empty buffer, skipping");
+      return null;
+    }
+
+    WALSplitter.WriterAndPath wap = null;
+
+    long startTime = System.nanoTime();
+    try {
+      int editsCount = 0;
+
+      for (WAL.Entry logEntry : entries) {
+        if (wap == null) {
+          wap = getWriterAndPath(logEntry, reusable);
+          if (wap == null) {
+            // This log spews the full edit. Can be massive in the log. Enable only debugging
+            // WAL lost edit issues.
+            LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
+            return null;
+          }
+        }
+        filterCellByStore(logEntry);
+        if (!logEntry.getEdit().isEmpty()) {
+          wap.writer.append(logEntry);
+          this.updateRegionMaximumEditLogSeqNum(logEntry);
+          editsCount++;
+        } else {
+          wap.incrementSkippedEdits(1);
+        }
+      }
+      // Pass along summary statistics
+      wap.incrementEdits(editsCount);
+      wap.incrementNanoTime(System.nanoTime() - startTime);
+    } catch (IOException e) {
+      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
+      LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
+      throw e;
+    }
+    return wap;
+  }
+
+  @Override
+  public boolean keepRegionEvent(WAL.Entry entry) {
+    ArrayList<Cell> cells = entry.getEdit().getCells();
+    for (Cell cell : cells) {
+      if (WALEdit.isCompactionMarker(cell)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @return a map from encoded region ID to the number of edits written out for that region.
+   */
+  @Override
+  public Map<byte[], Long> getOutputCounts() {
+    TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
+      ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+    }
+    return ret;
+  }
+
+  @Override
+  public int getNumberOfRecoveredRegions() {
+    return writers.size();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
new file mode 100644
index 0000000..729ea8b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * The following class is an abstraction class to provide a common interface to support different
+ * ways of consuming recovered edits.
+ */
+@InterfaceAudience.Private
+public abstract class OutputSink {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
+
+  protected WALSplitter.PipelineController controller;
+  protected EntryBuffers entryBuffers;
+
+  protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
+      new ConcurrentHashMap<>();
+
+  protected final List<WriterThread> writerThreads = Lists.newArrayList();
+
+  /* Set of regions which we've decided should not output edits */
+  protected final Set<byte[]> blacklistedRegions =
+      Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
+
+  protected boolean closeAndCleanCompleted = false;
+
+  protected boolean writersClosed = false;
+
+  protected final int numThreads;
+
+  protected CancelableProgressable reporter = null;
+
+  protected AtomicLong skippedEdits = new AtomicLong();
+
+  protected List<Path> splits = null;
+
+  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
+      int numWriters) {
+    numThreads = numWriters;
+    this.controller = controller;
+    this.entryBuffers = entryBuffers;
+  }
+
+  void setReporter(CancelableProgressable reporter) {
+    this.reporter = reporter;
+  }
+
+  /**
+   * Start the threads that will pump data from the entryBuffers to the output files.
+   */
+  public synchronized void startWriterThreads() {
+    for (int i = 0; i < numThreads; i++) {
+      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
+      t.start();
+      writerThreads.add(t);
+    }
+  }
+
+  /**
+   * Update region's maximum edit log SeqNum.
+   */
+  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
+    synchronized (regionMaximumEditLogSeqNum) {
+      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
+      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
+        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
+      }
+    }
+  }
+
+  /**
+   * @return the number of currently opened writers
+   */
+  int getNumOpenWriters() {
+    return this.writers.size();
+  }
+
+  long getSkippedEdits() {
+    return this.skippedEdits.get();
+  }
+
+  /**
+   * Wait for writer threads to dump all info to the sink
+   * @return true when there is no error
+   */
+  protected boolean finishWriting(boolean interrupt) throws IOException {
+    LOG.debug("Waiting for split writer threads to finish");
+    boolean progress_failed = false;
+    for (WriterThread t : writerThreads) {
+      t.finish();
+    }
+    if (interrupt) {
+      for (WriterThread t : writerThreads) {
+        t.interrupt(); // interrupt the writer threads. We are stopping now.
+      }
+    }
+
+    for (WriterThread t : writerThreads) {
+      if (!progress_failed && reporter != null && !reporter.progress()) {
+        progress_failed = true;
+      }
+      try {
+        t.join();
+      } catch (InterruptedException ie) {
+        IOException iie = new InterruptedIOException();
+        iie.initCause(ie);
+        throw iie;
+      }
+    }
+    controller.checkForErrors();
+    LOG.info("{} split writers finished; closing.", this.writerThreads.size());
+    return (!progress_failed);
+  }
+
+  public abstract List<Path> finishWritingAndClose() throws IOException;
+
+  /**
+   * @return a map from encoded region ID to the number of edits written out for that region.
+   */
+  public abstract Map<byte[], Long> getOutputCounts();
+
+  /**
+   * @return number of regions we've recovered
+   */
+  public abstract int getNumberOfRecoveredRegions();
+
+  /**
+   * @param buffer A WAL Edit Entry
+   */
+  public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
+
+  /**
+   * WriterThread call this function to help flush internal remaining edits in buffer before close
+   * @return true when underlying sink has something to flush
+   */
+  public boolean flush() throws IOException {
+    return false;
+  }
+
+  /**
+   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
+   * want to get all of those edits.
+   * @return Return true if this sink wants to accept this region-level WALEdit.
+   */
+  public abstract boolean keepRegionEvent(WAL.Entry entry);
+
+  public static class WriterThread extends Thread {
+    private volatile boolean shouldStop = false;
+    private WALSplitter.PipelineController controller;
+    private EntryBuffers entryBuffers;
+    private OutputSink outputSink = null;
+
+    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
+        OutputSink sink, int i) {
+      super(Thread.currentThread().getName() + "-Writer-" + i);
+      this.controller = controller;
+      this.entryBuffers = entryBuffers;
+      outputSink = sink;
+    }
+
+    @Override
+    public void run()  {
+      try {
+        doRun();
+      } catch (Throwable t) {
+        LOG.error("Exiting thread", t);
+        controller.writerThreadError(t);
+      }
+    }
+
+    private void doRun() throws IOException {
+      LOG.trace("Writer thread starting");
+      while (true) {
+        WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+        if (buffer == null) {
+          // No data currently available, wait on some more to show up
+          synchronized (controller.dataAvailable) {
+            if (shouldStop && !this.outputSink.flush()) {
+              return;
+            }
+            try {
+              controller.dataAvailable.wait(500);
+            } catch (InterruptedException ie) {
+              if (!shouldStop) {
+                throw new RuntimeException(ie);
+              }
+            }
+          }
+          continue;
+        }
+
+        assert buffer != null;
+        try {
+          writeBuffer(buffer);
+        } finally {
+          entryBuffers.doneWriting(buffer);
+        }
+      }
+    }
+
+    private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+      outputSink.append(buffer);
+    }
+
+    void setShouldStop(boolean shouldStop) {
+      this.shouldStop = shouldStop;
+    }
+
+    void finish() {
+      synchronized (controller.dataAvailable) {
+        shouldStop = true;
+        controller.dataAvailable.notifyAll();
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
new file mode 100644
index 0000000..d518f2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * This class provides static methods to support WAL splitting related works
+ */
+@InterfaceAudience.Private
+public final class WALSplitUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
+
+  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
+  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
+  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
+  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
+  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
+
+  private WALSplitUtil() {
+  }
+
+  /**
+   * Completes the work done by splitLogFile by archiving logs
+   * <p>
+   * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
+   * the splitLogFile() part. If the master crashes then this function might get called multiple
+   * times.
+   * <p>
+   * @param logfile
+   * @param conf
+   * @throws IOException
+   */
+  public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
+    Path walDir = FSUtils.getWALRootDir(conf);
+    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path walPath;
+    if (FSUtils.isStartingWithPath(walDir, logfile)) {
+      walPath = new Path(logfile);
+    } else {
+      walPath = new Path(walDir, logfile);
+    }
+    finishSplitLogFile(walDir, oldLogDir, walPath, conf);
+  }
+
+  static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
+      Configuration conf) throws IOException {
+    List<Path> processedLogs = new ArrayList<>();
+    List<Path> corruptedLogs = new ArrayList<>();
+    FileSystem walFS = walDir.getFileSystem(conf);
+    if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
+      corruptedLogs.add(walPath);
+    } else {
+      processedLogs.add(walPath);
+    }
+    archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
+    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
+    walFS.delete(stagingDir, true);
+  }
+
+  /**
+   * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
+   * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
+   */
+  private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
+      final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
+    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
+    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
+      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
+        corruptDir);
+    }
+    if (!walFS.mkdirs(corruptDir)) {
+      LOG.info("Unable to mkdir {}", corruptDir);
+    }
+    walFS.mkdirs(oldWALDir);
+
+    // this method can get restarted or called multiple times for archiving
+    // the same log files.
+    for (Path corruptedWAL : corruptedWALs) {
+      Path p = new Path(corruptDir, corruptedWAL.getName());
+      if (walFS.exists(corruptedWAL)) {
+        if (!walFS.rename(corruptedWAL, p)) {
+          LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
+        } else {
+          LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
+        }
+      }
+    }
+
+    for (Path p : processedWALs) {
+      Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
+      if (walFS.exists(p)) {
+        if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
+          LOG.warn("Unable to move {} to {}", p, newPath);
+        } else {
+          LOG.info("Archived processed log {} to {}", p, newPath);
+        }
+      }
+    }
+  }
+
+  /**
+   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
+   * named for the sequenceid in the passed <code>logEntry</code>: e.g.
+   * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
+   * RECOVERED_EDITS_DIR under the region creating it if necessary.
+   * @param walEntry walEntry to recover
+   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
+   * @param tmpDirName of the directory used to sideline old recovered edits file
+   * @param conf configuration
+   * @return Path to file into which to dump split log edits.
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  @VisibleForTesting
+  static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
+      String tmpDirName, Configuration conf) throws IOException {
+    FileSystem walFS = FSUtils.getWALFileSystem(conf);
+    Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
+    String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
+    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
+    Path dir = getRegionDirRecoveredEditsDir(regionDir);
+
+    if (walFS.exists(dir) && walFS.isFile(dir)) {
+      Path tmp = new Path(tmpDirName);
+      if (!walFS.exists(tmp)) {
+        walFS.mkdirs(tmp);
+      }
+      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
+      LOG.warn("Found existing old file: {}. It could be some "
+          + "leftover of an old installation. It should be a folder instead. "
+          + "So moving it to {}",
+        dir, tmp);
+      if (!walFS.rename(dir, tmp)) {
+        LOG.warn("Failed to sideline old file {}", dir);
+      }
+    }
+
+    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
+      LOG.warn("mkdir failed on {}", dir);
+    }
+    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
+    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
+    // region's replayRecoveredEdits will not delete it
+    String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
+    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
+    return new Path(dir, fileName);
+  }
+
+  private static String getTmpRecoveredEditsFileName(String fileName) {
+    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
+  }
+
+  /**
+   * Get the completed recovered edits file path, renaming it to be by last edit in the file from
+   * its first edit. Then we could use the name to skip recovered edits when doing
+   * {@link HRegion#replayRecoveredEditsIfAny}.
+   * @return dstPath take file's last edit log seq num as the name
+   */
+  static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
+    String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
+    return new Path(srcPath.getParent(), fileName);
+  }
+
+  @VisibleForTesting
+  static String formatRecoveredEditsFileName(final long seqid) {
+    return String.format("%019d", seqid);
+  }
+
+  /**
+   * @param regionDir This regions directory in the filesystem.
+   * @return The directory that holds recovered edits files for the region <code>regionDir</code>
+   */
+  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
+    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
+  }
+
+  /**
+   * Check whether there is recovered.edits in the region dir
+   * @param conf conf
+   * @param regionInfo the region to check
+   * @throws IOException IOException
+   * @return true if recovered.edits exist in the region dir
+   */
+  public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
+      throws IOException {
+    // No recovered.edits for non default replica regions
+    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+      return false;
+    }
+    // Only default replica region can reach here, so we can use regioninfo
+    // directly without converting it to default replica's regioninfo.
+    Path regionDir =
+        FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
+    NavigableSet<Path> files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir);
+    return files != null && !files.isEmpty();
+  }
+
+  /**
+   * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
+   * @param walFS WAL FileSystem used to retrieving split edits files.
+   * @param regionDir WAL region dir to look for recovered edits files under.
+   * @return Files in passed <code>regionDir</code> as a sorted set.
+   * @throws IOException
+   */
+  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
+      final Path regionDir) throws IOException {
+    NavigableSet<Path> filesSorted = new TreeSet<>();
+    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
+    if (!walFS.exists(editsdir)) {
+      return filesSorted;
+    }
+    FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        boolean result = false;
+        try {
+          // Return files and only files that match the editfile names pattern.
+          // There can be other files in this directory other than edit files.
+          // In particular, on error, we'll move aside the bad edit file giving
+          // it a timestamp suffix. See moveAsideBadEditsFile.
+          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+          result = walFS.isFile(p) && m.matches();
+          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
+          // because it means splitwal thread is writting this file.
+          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
+            result = false;
+          }
+          // Skip SeqId Files
+          if (isSequenceIdFile(p)) {
+            result = false;
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed isFile check on {}", p, e);
+        }
+        return result;
+      }
+    });
+    if (ArrayUtils.isNotEmpty(files)) {
+      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
+    }
+    return filesSorted;
+  }
+
+  /**
+   * Move aside a bad edits file.
+   * @param walFS WAL FileSystem used to rename bad edits file.
+   * @param edits Edits file to move aside.
+   * @return The name of the moved aside file.
+   * @throws IOException
+   */
+  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
+      throws IOException {
+    Path moveAsideName =
+        new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
+    if (!walFS.rename(edits, moveAsideName)) {
+      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
+    }
+    return moveAsideName;
+  }
+
+  /**
+   * Is the given file a region open sequence id file.
+   */
+  @VisibleForTesting
+  public static boolean isSequenceIdFile(final Path file) {
+    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
+        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
+  }
+
+  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
+      throws IOException {
+    // TODO: Why are we using a method in here as part of our normal region open where
+    // there is no splitting involved? Fix. St.Ack 01/20/2017.
+    Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
+    try {
+      FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
+      return files != null ? files : new FileStatus[0];
+    } catch (FileNotFoundException e) {
+      return new FileStatus[0];
+    }
+  }
+
+  private static long getMaxSequenceId(FileStatus[] files) {
+    long maxSeqId = -1L;
+    for (FileStatus file : files) {
+      String fileName = file.getPath().getName();
+      try {
+        maxSeqId = Math.max(maxSeqId, Long
+            .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
+      } catch (NumberFormatException ex) {
+        LOG.warn("Invalid SeqId File Name={}", fileName);
+      }
+    }
+    return maxSeqId;
+  }
+
+  /**
+   * Get the max sequence id which is stored in the region directory. -1 if none.
+   */
+  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
+    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
+  }
+
+  /**
+   * Create a file with name as region's max sequence id
+   */
+  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
+      throws IOException {
+    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
+    long maxSeqId = getMaxSequenceId(files);
+    if (maxSeqId > newMaxSeqId) {
+      throw new IOException("The new max sequence id " + newMaxSeqId
+          + " is less than the old max sequence id " + maxSeqId);
+    }
+    // write a new seqId file
+    Path newSeqIdFile =
+        new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
+    if (newMaxSeqId != maxSeqId) {
+      try {
+        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
+          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
+        }
+        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
+          maxSeqId);
+      } catch (FileAlreadyExistsException ignored) {
+        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
+      }
+    }
+    // remove old ones
+    for (FileStatus status : files) {
+      if (!newSeqIdFile.equals(status.getPath())) {
+        walFS.delete(status.getPath(), false);
+      }
+    }
+  }
+
+  /** A struct used by getMutationsFromWALEntry */
+  public static class MutationReplay implements Comparable<MutationReplay> {
+    public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
+        long nonceGroup, long nonce) {
+      this.type = type;
+      this.mutation = mutation;
+      if (this.mutation.getDurability() != Durability.SKIP_WAL) {
+        // using ASYNC_WAL for relay
+        this.mutation.setDurability(Durability.ASYNC_WAL);
+      }
+      this.nonceGroup = nonceGroup;
+      this.nonce = nonce;
+    }
+
+    private final ClientProtos.MutationProto.MutationType type;
+    public final Mutation mutation;
+    public final long nonceGroup;
+    public final long nonce;
+
+    @Override
+    public int compareTo(final MutationReplay d) {
+      return this.mutation.compareTo(d.mutation);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof MutationReplay)) {
+        return false;
+      } else {
+        return this.compareTo((MutationReplay) obj) == 0;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return this.mutation.hashCode();
+    }
+
+    public ClientProtos.MutationProto.MutationType getType() {
+      return type;
+    }
+  }
+
+  /**
+   * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &amp;
+   * WALEdit from the passed in WALEntry
+   * @param entry
+   * @param cells
+   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
+   *          extracted from the passed in WALEntry.
+   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
+   * @throws IOException
+   */
+  public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
+      CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
+    if (entry == null) {
+      // return an empty array
+      return Collections.emptyList();
+    }
+
+    long replaySeqId =
+        (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber()
+            : entry.getKey().getLogSequenceNumber();
+    int count = entry.getAssociatedCellCount();
+    List<MutationReplay> mutations = new ArrayList<>();
+    Cell previousCell = null;
+    Mutation m = null;
+    WALKeyImpl key = null;
+    WALEdit val = null;
+    if (logEntry != null) {
+      val = new WALEdit();
+    }
+
+    for (int i = 0; i < count; i++) {
+      // Throw index out of bounds if our cell count is off
+      if (!cells.advance()) {
+        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+      }
+      Cell cell = cells.current();
+      if (val != null) val.add(cell);
+
+      boolean isNewRowOrType =
+          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
+              || !CellUtil.matchingRows(previousCell, cell);
+      if (isNewRowOrType) {
+        // Create new mutation
+        if (CellUtil.isDelete(cell)) {
+          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+          // Deletes don't have nonces.
+          mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
+              HConstants.NO_NONCE, HConstants.NO_NONCE));
+        } else {
+          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+          // Puts might come from increment or append, thus we need nonces.
+          long nonceGroup =
+              entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
+          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
+          mutations.add(
+            new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
+        }
+      }
+      if (CellUtil.isDelete(cell)) {
+        ((Delete) m).add(cell);
+      } else {
+        ((Put) m).add(cell);
+      }
+      m.setDurability(durability);
+      previousCell = cell;
+    }
+
+    // reconstruct WALKey
+    if (logEntry != null) {
+      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
+          entry.getKey();
+      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
+      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
+        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
+      }
+      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
+          TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
+          walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(),
+          walKeyProto.getNonce(), null);
+      logEntry.setFirst(key);
+      logEntry.setSecond(val);
+    }
+
+    return mutations;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index c436db2..300fbf6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -17,74 +17,43 @@
  */
 package org.apache.hadoop.hbase.wal;
 
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
+
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -92,16 +61,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
  * files that are no longer being written to, into new files, one per region, for
@@ -148,12 +111,11 @@ public class WALSplitter {
 
 
   @VisibleForTesting
-  WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
-      FileSystem walFS, LastSequenceId idChecker,
-      SplitLogWorkerCoordination splitLogWorkerCoordination) {
+  WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
+      LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
     this.conf = HBaseConfiguration.create(conf);
-    String codecClassName = conf
-        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+    String codecClassName =
+        conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
     this.walDir = walDir;
     this.walFS = walFS;
@@ -170,14 +132,27 @@ public class WALSplitter {
         splitWriterCreationBounded);
 
     int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-    if(splitWriterCreationBounded){
-      outputSink = new BoundedLogWriterCreationOutputSink(
-          controller, entryBuffers, numWriterThreads);
-    }else {
-      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+    if (splitWriterCreationBounded) {
+      outputSink =
+          new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads);
+    } else {
+      outputSink =
+          new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
     }
   }
 
+  WALFactory getWalFactory(){
+    return this.walFactory;
+  }
+
+  FileStatus getFileBeingSplit() {
+    return fileBeingSplit;
+  }
+
+  Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
+    return regionMaxSeqIdInStores;
+  }
+
   /**
    * Splits a WAL file into region's recovered-edits directory.
    * This is the main entry point for distributed log splitting from SplitLogWorker.
@@ -361,357 +336,7 @@ public class WALSplitter {
   }
 
   /**
-   * Completes the work done by splitLogFile by archiving logs
-   * <p>
-   * It is invoked by SplitLogManager once it knows that one of the
-   * SplitLogWorkers have completed the splitLogFile() part. If the master
-   * crashes then this function might get called multiple times.
-   * <p>
-   * @param logfile
-   * @param conf
-   * @throws IOException
-   */
-  public static void finishSplitLogFile(String logfile,
-      Configuration conf)  throws IOException {
-    Path walDir = FSUtils.getWALRootDir(conf);
-    Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    Path logPath;
-    if (FSUtils.isStartingWithPath(walDir, logfile)) {
-      logPath = new Path(logfile);
-    } else {
-      logPath = new Path(walDir, logfile);
-    }
-    finishSplitLogFile(walDir, oldLogDir, logPath, conf);
-  }
-
-  private static void finishSplitLogFile(Path walDir, Path oldLogDir,
-      Path logPath, Configuration conf) throws IOException {
-    List<Path> processedLogs = new ArrayList<>();
-    List<Path> corruptedLogs = new ArrayList<>();
-    FileSystem walFS = walDir.getFileSystem(conf);
-    if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
-      corruptedLogs.add(logPath);
-    } else {
-      processedLogs.add(logPath);
-    }
-    archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
-    Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
-    walFS.delete(stagingDir, true);
-  }
-
-  /**
-   * Moves processed logs to a oldLogDir after successful processing Moves
-   * corrupted logs (any log that couldn't be successfully parsed to corruptDir
-   * (.corrupt) for later investigation
-   *
-   * @param corruptedLogs
-   * @param processedLogs
-   * @param oldLogDir
-   * @param walFS WAL FileSystem to archive files on.
-   * @param conf
-   * @throws IOException
-   */
-  private static void archiveLogs(
-      final List<Path> corruptedLogs,
-      final List<Path> processedLogs, final Path oldLogDir,
-      final FileSystem walFS, final Configuration conf) throws IOException {
-    final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
-    if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
-      LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
-          corruptDir);
-    }
-    if (!walFS.mkdirs(corruptDir)) {
-      LOG.info("Unable to mkdir {}", corruptDir);
-    }
-    walFS.mkdirs(oldLogDir);
-
-    // this method can get restarted or called multiple times for archiving
-    // the same log files.
-    for (Path corrupted : corruptedLogs) {
-      Path p = new Path(corruptDir, corrupted.getName());
-      if (walFS.exists(corrupted)) {
-        if (!walFS.rename(corrupted, p)) {
-          LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
-        } else {
-          LOG.warn("Moved corrupted log {} to {}", corrupted, p);
-        }
-      }
-    }
-
-    for (Path p : processedLogs) {
-      Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
-      if (walFS.exists(p)) {
-        if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
-          LOG.warn("Unable to move {} to {}", p, newPath);
-        } else {
-          LOG.info("Archived processed log {} to {}", p, newPath);
-        }
-      }
-    }
-  }
-
-  /**
-   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
-   * <code>logEntry</code> named for the sequenceid in the passed
-   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
-   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
-   * creating it if necessary.
-   * @param logEntry
-   * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
-   * @param tmpDirName of the directory used to sideline old recovered edits file
-   * @param conf
-   * @return Path to file into which to dump split log edits.
-   * @throws IOException
-   */
-  @SuppressWarnings("deprecation")
-  @VisibleForTesting
-  static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
-      String tmpDirName, Configuration conf) throws IOException {
-    FileSystem walFS = FSUtils.getWALFileSystem(conf);
-    Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
-    String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
-    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
-    Path dir = getRegionDirRecoveredEditsDir(regionDir);
-
-
-    if (walFS.exists(dir) && walFS.isFile(dir)) {
-      Path tmp = new Path(tmpDirName);
-      if (!walFS.exists(tmp)) {
-        walFS.mkdirs(tmp);
-      }
-      tmp = new Path(tmp,
-        HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
-      LOG.warn("Found existing old file: {}. It could be some "
-        + "leftover of an old installation. It should be a folder instead. "
-        + "So moving it to {}", dir, tmp);
-      if (!walFS.rename(dir, tmp)) {
-        LOG.warn("Failed to sideline old file {}", dir);
-      }
-    }
-
-    if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
-      LOG.warn("mkdir failed on {}", dir);
-    }
-    // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
-    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
-    // region's replayRecoveredEdits will not delete it
-    String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
-    fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
-    return new Path(dir, fileName);
-  }
-
-  private static String getTmpRecoveredEditsFileName(String fileName) {
-    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
-  }
-
-  /**
-   * Get the completed recovered edits file path, renaming it to be by last edit
-   * in the file from its first edit. Then we could use the name to skip
-   * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
-   * @param srcPath
-   * @param maximumEditLogSeqNum
-   * @return dstPath take file's last edit log seq num as the name
-   */
-  private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
-      long maximumEditLogSeqNum) {
-    String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
-    return new Path(srcPath.getParent(), fileName);
-  }
-
-  @VisibleForTesting
-  static String formatRecoveredEditsFileName(final long seqid) {
-    return String.format("%019d", seqid);
-  }
-
-  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
-  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
-
-  /**
-   * @param regionDir
-   *          This regions directory in the filesystem.
-   * @return The directory that holds recovered edits files for the region
-   *         <code>regionDir</code>
-   */
-  public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
-    return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
-  }
-
-  /**
-   * Check whether there is recovered.edits in the region dir
-   * @param conf conf
-   * @param regionInfo the region to check
-   * @throws IOException IOException
-   * @return true if recovered.edits exist in the region dir
-   */
-  public static boolean hasRecoveredEdits(final Configuration conf,
-    final RegionInfo regionInfo) throws IOException {
-    // No recovered.edits for non default replica regions
-    if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
-      return false;
-    }
-    //Only default replica region can reach here, so we can use regioninfo
-    //directly without converting it to default replica's regioninfo.
-    Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(),
-        regionInfo.getEncodedName());
-    NavigableSet<Path> files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir);
-    return files != null && !files.isEmpty();
-  }
-
-
-  /**
-   * Returns sorted set of edit files made by splitter, excluding files
-   * with '.temp' suffix.
-   *
-   * @param walFS WAL FileSystem used to retrieving split edits files.
-   * @param regionDir WAL region dir to look for recovered edits files under.
-   * @return Files in passed <code>regionDir</code> as a sorted set.
-   * @throws IOException
-   */
-  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
-      final Path regionDir) throws IOException {
-    NavigableSet<Path> filesSorted = new TreeSet<>();
-    Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
-    if (!walFS.exists(editsdir)) {
-      return filesSorted;
-    }
-    FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
-      @Override
-      public boolean accept(Path p) {
-        boolean result = false;
-        try {
-          // Return files and only files that match the editfile names pattern.
-          // There can be other files in this directory other than edit files.
-          // In particular, on error, we'll move aside the bad edit file giving
-          // it a timestamp suffix. See moveAsideBadEditsFile.
-          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
-          result = walFS.isFile(p) && m.matches();
-          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
-          // because it means splitwal thread is writting this file.
-          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
-            result = false;
-          }
-          // Skip SeqId Files
-          if (isSequenceIdFile(p)) {
-            result = false;
-          }
-        } catch (IOException e) {
-          LOG.warn("Failed isFile check on {}", p, e);
-        }
-        return result;
-      }
-    });
-    if (ArrayUtils.isNotEmpty(files)) {
-      Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
-    }
-    return filesSorted;
-  }
-
-  /**
-   * Move aside a bad edits file.
-   *
-   * @param walFS WAL FileSystem used to rename bad edits file.
-   * @param edits
-   *          Edits file to move aside.
-   * @return The name of the moved aside file.
-   * @throws IOException
-   */
-  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
-      throws IOException {
-    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
-        + System.currentTimeMillis());
-    if (!walFS.rename(edits, moveAsideName)) {
-      LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
-    }
-    return moveAsideName;
-  }
-
-  private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
-  private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
-  private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
-
-  /**
-   * Is the given file a region open sequence id file.
-   */
-  @VisibleForTesting
-  public static boolean isSequenceIdFile(final Path file) {
-    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
-        || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
-  }
-
-  private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
-      throws IOException {
-    // TODO: Why are we using a method in here as part of our normal region open where
-    // there is no splitting involved? Fix. St.Ack 01/20/2017.
-    Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
-    try {
-      FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
-      return files != null ? files : new FileStatus[0];
-    } catch (FileNotFoundException e) {
-      return new FileStatus[0];
-    }
-  }
-
-  private static long getMaxSequenceId(FileStatus[] files) {
-    long maxSeqId = -1L;
-    for (FileStatus file : files) {
-      String fileName = file.getPath().getName();
-      try {
-        maxSeqId = Math.max(maxSeqId, Long
-          .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
-      } catch (NumberFormatException ex) {
-        LOG.warn("Invalid SeqId File Name={}", fileName);
-      }
-    }
-    return maxSeqId;
-  }
-
-  /**
-   * Get the max sequence id which is stored in the region directory. -1 if none.
-   */
-  public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
-    return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
-  }
-
-  /**
-   * Create a file with name as region's max sequence id
-   */
-  public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
-      throws IOException {
-    FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
-    long maxSeqId = getMaxSequenceId(files);
-    if (maxSeqId > newMaxSeqId) {
-      throw new IOException("The new max sequence id " + newMaxSeqId +
-        " is less than the old max sequence id " + maxSeqId);
-    }
-    // write a new seqId file
-    Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir),
-      newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
-    if (newMaxSeqId != maxSeqId) {
-      try {
-        if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
-          throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
-        }
-        LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
-          maxSeqId);
-      } catch (FileAlreadyExistsException ignored) {
-        // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
-      }
-    }
-    // remove old ones
-    for (FileStatus status : files) {
-      if (!newSeqIdFile.equals(status.getPath())) {
-        walFS.delete(status.getPath(), false);
-      }
-    }
-  }
-
-  /**
    * Create a new {@link Reader} for reading logs to split.
-   *
-   * @param file
-   * @return A new Reader instance, caller should close
-   * @throws IOException
-   * @throws CorruptedLogFileException
    */
   protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
       throws IOException, CorruptedLogFileException {
@@ -761,7 +386,7 @@ public class WALSplitter {
   }
 
   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
-  throws CorruptedLogFileException, IOException {
+      throws CorruptedLogFileException, IOException {
     try {
       return in.next();
     } catch (EOFException eof) {
@@ -771,9 +396,8 @@ public class WALSplitter {
     } catch (IOException e) {
       // If the IOE resulted from bad file format,
       // then this problem is idempotent and retrying won't help
-      if (e.getCause() != null &&
-          (e.getCause() instanceof ParseException ||
-           e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
+      if (e.getCause() != null && (e.getCause() instanceof ParseException
+          || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
         LOG.warn("Parse exception from wal {}. Continuing", path, e);
         return null;
       }
@@ -781,19 +405,18 @@ public class WALSplitter {
         throw e;
       }
       CorruptedLogFileException t =
-        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
-            " while parsing wal " + path + ". Marking as corrupted");
+          new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal "
+              + path + ". Marking as corrupted");
       t.initCause(e);
       throw t;
     }
   }
 
   /**
-   * Create a new {@link Writer} for writing log splits.
+   * Create a new {@link WALProvider.Writer} for writing log splits.
    * @return a new Writer instance, caller should close
    */
-  protected Writer createWriter(Path logfile)
-      throws IOException {
+  protected WALProvider.Writer createWriter(Path logfile) throws IOException {
     return walFactory.createRecoveredEditsWriter(walFS, logfile);
   }
 
@@ -826,7 +449,7 @@ public class WALSplitter {
 
     // Wait/notify for when data has been produced by the writer thread,
     // consumed by the reader thread, or an exception occurred
-    public final Object dataAvailable = new Object();
+    final Object dataAvailable = new Object();
 
     void writerThreadError(Throwable t) {
       thrown.compareAndSet(null, t);
@@ -837,7 +460,9 @@ public class WALSplitter {
      */
     void checkForErrors() throws IOException {
       Throwable thrown = this.thrown.get();
-      if (thrown == null) return;
+      if (thrown == null) {
+        return;
+      }
       if (thrown instanceof IOException) {
         throw new IOException(thrown);
       } else {
@@ -847,134 +472,6 @@ public class WALSplitter {
   }
 
   /**
-   * Class which accumulates edits and separates them into a buffer per region
-   * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
-   * a predefined threshold.
-   *
-   * Writer threads then pull region-specific buffers from this class.
-   */
-  public static class EntryBuffers {
-    PipelineController controller;
-
-    Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-    /* Track which regions are currently in the middle of writing. We don't allow
-       an IO thread to pick up bytes from a region if we're already writing
-       data for that region in a different IO thread. */
-    Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-
-    long totalBuffered = 0;
-    long maxHeapUsage;
-    boolean splitWriterCreationBounded;
-
-    public EntryBuffers(PipelineController controller, long maxHeapUsage) {
-      this(controller, maxHeapUsage, false);
-    }
-
-    public EntryBuffers(PipelineController controller, long maxHeapUsage,
-        boolean splitWriterCreationBounded){
-      this.controller = controller;
-      this.maxHeapUsage = maxHeapUsage;
-      this.splitWriterCreationBounded = splitWriterCreationBounded;
-    }
-
-    /**
-     * Append a log entry into the corresponding region buffer.
-     * Blocks if the total heap usage has crossed the specified threshold.
-     *
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    public void appendEntry(Entry entry) throws InterruptedException, IOException {
-      WALKey key = entry.getKey();
-
-      RegionEntryBuffer buffer;
-      long incrHeap;
-      synchronized (this) {
-        buffer = buffers.get(key.getEncodedRegionName());
-        if (buffer == null) {
-          buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
-          buffers.put(key.getEncodedRegionName(), buffer);
-        }
-        incrHeap= buffer.appendEntry(entry);
-      }
-
-      // If we crossed the chunk threshold, wait for more space to be available
-      synchronized (controller.dataAvailable) {
-        totalBuffered += incrHeap;
-        while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
-          LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
-          controller.dataAvailable.wait(2000);
-        }
-        controller.dataAvailable.notifyAll();
-      }
-      controller.checkForErrors();
-    }
-
-    /**
-     * @return RegionEntryBuffer a buffer of edits to be written.
-     */
-    synchronized RegionEntryBuffer getChunkToWrite() {
-      // The core part of limiting opening writers is it doesn't return chunk only if the
-      // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
-      // region during splitting. It will flush all the logs in the buffer after splitting
-      // through a threadpool, which means the number of writers it created is under control.
-      if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
-        return null;
-      }
-      long biggestSize = 0;
-      byte[] biggestBufferKey = null;
-
-      for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
-        long size = entry.getValue().heapSize();
-        if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
-          biggestSize = size;
-          biggestBufferKey = entry.getKey();
-        }
-      }
-      if (biggestBufferKey == null) {
-        return null;
-      }
-
-      RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
-      currentlyWriting.add(biggestBufferKey);
-      return buffer;
-    }
-
-    void doneWriting(RegionEntryBuffer buffer) {
-      synchronized (this) {
-        boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
-        assert removed;
-      }
-      long size = buffer.heapSize();
-
-      synchronized (controller.dataAvailable) {
-        totalBuffered -= size;
-        // We may unblock writers
-        controller.dataAvailable.notifyAll();
-      }
-    }
-
-    synchronized boolean isRegionCurrentlyWriting(byte[] region) {
-      return currentlyWriting.contains(region);
-    }
-
-    public void waitUntilDrained() {
-      synchronized (controller.dataAvailable) {
-        while (totalBuffered > 0) {
-          try {
-            controller.dataAvailable.wait(2000);
-          } catch (InterruptedException e) {
-            LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
-            Thread.interrupted();
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  /**
    * A buffer of some number of edits for a given region.
    * This accumulates edits and also provides a memory optimization in order to
    * share a single byte array instance for the table and region name.
@@ -1026,723 +523,6 @@ public class WALSplitter {
     }
   }
 
-  public static class WriterThread extends Thread {
-    private volatile boolean shouldStop = false;
-    private PipelineController controller;
-    private EntryBuffers entryBuffers;
-    private OutputSink outputSink = null;
-
-    WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
-      super(Thread.currentThread().getName() + "-Writer-" + i);
-      this.controller = controller;
-      this.entryBuffers = entryBuffers;
-      outputSink = sink;
-    }
-
-    @Override
-    public void run()  {
-      try {
-        doRun();
-      } catch (Throwable t) {
-        LOG.error("Exiting thread", t);
-        controller.writerThreadError(t);
-      }
-    }
-
-    private void doRun() throws IOException {
-      LOG.trace("Writer thread starting");
-      while (true) {
-        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
-        if (buffer == null) {
-          // No data currently available, wait on some more to show up
-          synchronized (controller.dataAvailable) {
-            if (shouldStop && !this.outputSink.flush()) {
-              return;
-            }
-            try {
-              controller.dataAvailable.wait(500);
-            } catch (InterruptedException ie) {
-              if (!shouldStop) {
-                throw new RuntimeException(ie);
-              }
-            }
-          }
-          continue;
-        }
-
-        assert buffer != null;
-        try {
-          writeBuffer(buffer);
-        } finally {
-          entryBuffers.doneWriting(buffer);
-        }
-      }
-    }
-
-    private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
-      outputSink.append(buffer);
-    }
-
-    void finish() {
-      synchronized (controller.dataAvailable) {
-        shouldStop = true;
-        controller.dataAvailable.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * The following class is an abstraction class to provide a common interface to support
-   * different ways of consuming recovered edits.
-   */
-  public static abstract class OutputSink {
-
-    protected PipelineController controller;
-    protected EntryBuffers entryBuffers;
-
-    protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
-    protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
-        new ConcurrentHashMap<>();
-
-
-    protected final List<WriterThread> writerThreads = Lists.newArrayList();
-
-    /* Set of regions which we've decided should not output edits */
-    protected final Set<byte[]> blacklistedRegions = Collections
-        .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
-
-    protected boolean closeAndCleanCompleted = false;
-
-    protected boolean writersClosed = false;
-
-    protected final int numThreads;
-
-    protected CancelableProgressable reporter = null;
-
-    protected AtomicLong skippedEdits = new AtomicLong();
-
-    protected List<Path> splits = null;
-
-    public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
-      numThreads = numWriters;
-      this.controller = controller;
-      this.entryBuffers = entryBuffers;
-    }
-
-    void setReporter(CancelableProgressable reporter) {
-      this.reporter = reporter;
-    }
-
-    /**
-     * Start the threads that will pump data from the entryBuffers to the output files.
-     */
-    public synchronized void startWriterThreads() {
-      for (int i = 0; i < numThreads; i++) {
-        WriterThread t = new WriterThread(controller, entryBuffers, this, i);
-        t.start();
-        writerThreads.add(t);
-      }
-    }
-
-    /**
-     *
-     * Update region's maximum edit log SeqNum.
-     */
-    void updateRegionMaximumEditLogSeqNum(Entry entry) {
-      synchronized (regionMaximumEditLogSeqNum) {
-        String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
-        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
-        if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
-          regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
-        }
-      }
-    }
-
-    /**
-     * @return the number of currently opened writers
-     */
-    int getNumOpenWriters() {
-      return this.writers.size();
-    }
-
-    long getSkippedEdits() {
-      return this.skippedEdits.get();
-    }
-
-    /**
-     * Wait for writer threads to dump all info to the sink
-     * @return true when there is no error
-     * @throws IOException
-     */
-    protected boolean finishWriting(boolean interrupt) throws IOException {
-      LOG.debug("Waiting for split writer threads to finish");
-      boolean progress_failed = false;
-      for (WriterThread t : writerThreads) {
-        t.finish();
-      }
-      if (interrupt) {
-        for (WriterThread t : writerThreads) {
-          t.interrupt(); // interrupt the writer threads. We are stopping now.
-        }
-      }
-
-      for (WriterThread t : writerThreads) {
-        if (!progress_failed && reporter != null && !reporter.progress()) {
-          progress_failed = true;
-        }
-        try {
-          t.join();
-        } catch (InterruptedException ie) {
-          IOException iie = new InterruptedIOException();
-          iie.initCause(ie);
-          throw iie;
-        }
-      }
-      controller.checkForErrors();
-      LOG.info("{} split writers finished; closing.", this.writerThreads.size());
-      return (!progress_failed);
-    }
-
-    public abstract List<Path> finishWritingAndClose() throws IOException;
-
-    /**
-     * @return a map from encoded region ID to the number of edits written out for that region.
-     */
-    public abstract Map<byte[], Long> getOutputCounts();
-
-    /**
-     * @return number of regions we've recovered
-     */
-    public abstract int getNumberOfRecoveredRegions();
-
-    /**
-     * @param buffer A WAL Edit Entry
-     * @throws IOException
-     */
-    public abstract void append(RegionEntryBuffer buffer) throws IOException;
-
-    /**
-     * WriterThread call this function to help flush internal remaining edits in buffer before close
-     * @return true when underlying sink has something to flush
-     */
-    public boolean flush() throws IOException {
-      return false;
-    }
-
-    /**
-     * Some WALEdit's contain only KV's for account on what happened to a region.
-     * Not all sinks will want to get all of those edits.
-     *
-     * @return Return true if this sink wants to accept this region-level WALEdit.
-     */
-    public abstract boolean keepRegionEvent(Entry entry);
-  }
-
-  /**
-   * Class that manages the output streams from the log splitting process.
-   */
-  class LogRecoveredEditsOutputSink extends OutputSink {
-
-    public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
-        int numWriters) {
-      // More threads could potentially write faster at the expense
-      // of causing more disk seeks as the logs are split.
-      // 3. After a certain setting (probably around 3) the
-      // process will be bound on the reader in the current
-      // implementation anyway.
-      super(controller, entryBuffers, numWriters);
-    }
-
-    /**
-     * @return null if failed to report progress
-     * @throws IOException
-     */
-    @Override
-    public List<Path> finishWritingAndClose() throws IOException {
-      boolean isSuccessful = false;
-      List<Path> result = null;
-      try {
-        isSuccessful = finishWriting(false);
-      } finally {
-        result = close();
-        List<IOException> thrown = closeLogWriters(null);
-        if (CollectionUtils.isNotEmpty(thrown)) {
-          throw MultipleIOException.createIOException(thrown);
-        }
-      }
-      if (isSuccessful) {
-        splits = result;
-      }
-      return splits;
-    }
-
-    // delete the one with fewer wal entries
-    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
-        throws IOException {
-      long dstMinLogSeqNum = -1L;
-      try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
-        WAL.Entry entry = reader.next();
-        if (entry != null) {
-          dstMinLogSeqNum = entry.getKey().getSequenceId();
-        }
-      } catch (EOFException e) {
-        LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
-            dst, e);
-      }
-      if (wap.minLogSeqNum < dstMinLogSeqNum) {
-        LOG.warn("Found existing old edits file. It could be the result of a previous failed"
-            + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
-            + walFS.getFileStatus(dst).getLen());
-        if (!walFS.delete(dst, false)) {
-          LOG.warn("Failed deleting of old {}", dst);
-          throw new IOException("Failed deleting of old " + dst);
-        }
-      } else {
-        LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
-            + ", length=" + walFS.getFileStatus(wap.p).getLen());
-        if (!walFS.delete(wap.p, false)) {
-          LOG.warn("Failed deleting of {}", wap.p);
-          throw new IOException("Failed deleting of " + wap.p);
-        }
-      }
-    }
-
-    /**
-     * Close all of the output streams.
-     * @return the list of paths written.
-     */
-    List<Path> close() throws IOException {
-      Preconditions.checkState(!closeAndCleanCompleted);
-
-      final List<Path> paths = new ArrayList<>();
-      final List<IOException> thrown = Lists.newArrayList();
-      ThreadPoolExecutor closeThreadPool = Threads
-          .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
-            private int count = 1;
-
-            @Override public Thread newThread(Runnable r) {
-              Thread t = new Thread(r, "split-log-closeStream-" + count++);
-              return t;
-            }
-          });
-      CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
-      boolean progress_failed;
-      try {
-        progress_failed = executeCloseTask(completionService, thrown, paths);
-      } catch (InterruptedException e) {
-        IOException iie = new InterruptedIOException();
-        iie.initCause(e);
-        throw iie;
-      } catch (ExecutionException e) {
-        throw new IOException(e.getCause());
-      } finally {
-        closeThreadPool.shutdownNow();
-      }
-      if (!thrown.isEmpty()) {
-        throw MultipleIOException.createIOException(thrown);
-      }
-      writersClosed = true;
-      closeAndCleanCompleted = true;
-      if (progress_failed) {
-        return null;
-      }
-      return paths;
-    }
-
-    /**
-     * @param completionService threadPool to execute the closing tasks
-     * @param thrown store the exceptions
-     * @param paths arrayList to store the paths written
-     * @return if close tasks executed successful
-     */
-    boolean executeCloseTask(CompletionService<Void> completionService,
-        List<IOException> thrown, List<Path> paths)
-        throws InterruptedException, ExecutionException {
-      for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
-        }
-        completionService.submit(new Callable<Void>() {
-          @Override public Void call() throws Exception {
-            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
-            Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
-            paths.add(dst);
-            return null;
-          }
-        });
-      }
-      boolean progress_failed = false;
-      for (int i = 0, n = this.writers.size(); i < n; i++) {
-        Future<Void> future = completionService.take();
-        future.get();
-        if (!progress_failed && reporter != null && !reporter.progress()) {
-          progress_failed = true;
-        }
-      }
-      return progress_failed;
-    }
-
-    Path closeWriter(String encodedRegionName, WriterAndPath wap,
-        List<IOException> thrown) throws IOException{
-      LOG.trace("Closing " + wap.p);
-      try {
-        wap.w.close();
-      } catch (IOException ioe) {
-        LOG.error("Couldn't close log at " + wap.p, ioe);
-        thrown.add(ioe);
-        return null;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
-            + " edits, skipped " + wap.editsSkipped + " edits in "
-            + (wap.nanosSpent / 1000 / 1000) + "ms");
-      }
-      if (wap.editsWritten == 0) {
-        // just remove the empty recovered.edits file
-        if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
-          LOG.warn("Failed deleting empty " + wap.p);
-          throw new IOException("Failed deleting empty  " + wap.p);
-        }
-        return null;
-      }
-
-      Path dst = getCompletedRecoveredEditsFilePath(wap.p,
-          regionMaximumEditLogSeqNum.get(encodedRegionName));
-      try {
-        if (!dst.equals(wap.p) && walFS.exists(dst)) {
-          deleteOneWithFewerEntries(wap, dst);
-        }
-        // Skip the unit tests which create a splitter that reads and
-        // writes the data without touching disk.
-        // TestHLogSplit#testThreading is an example.
-        if (walFS.exists(wap.p)) {
-          if (!walFS.rename(wap.p, dst)) {
-            throw new IOException("Failed renaming " + wap.p + " to " + dst);
-          }
-          LOG.info("Rename " + wap.p + " to " + dst);
-        }
-      } catch (IOException ioe) {
-        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
-        thrown.add(ioe);
-        return null;
-      }
-      return dst;
-    }
-
-    private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
-      if (writersClosed) {
-        return thrown;
-      }
-      if (thrown == null) {
-        thrown = Lists.newArrayList();
-      }
-      try {
-        for (WriterThread t : writerThreads) {
-          while (t.isAlive()) {
-            t.shouldStop = true;
-            t.interrupt();
-            try {
-              t.join(10);
-            } catch (InterruptedException e) {
-              IOException iie = new InterruptedIOException();
-              iie.initCause(e);
-              throw iie;
-            }
-          }
-        }
-      } finally {
-        WriterAndPath wap = null;
-        for (SinkWriter tmpWAP : writers.values()) {
-          try {
-            wap = (WriterAndPath) tmpWAP;
-            wap.w.close();
-          } catch (IOException ioe) {
-            LOG.error("Couldn't close log at " + wap.p, ioe);
-            thrown.add(ioe);
-            continue;
-          }
-          LOG.info(
-              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
-                  / 1000 / 1000) + "ms)");
-        }
-        writersClosed = true;
-      }
-
-      return thrown;
-    }
-
-    /**
-     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
-     * long as multiple threads are always acting on different regions.
-     * @return null if this region shouldn't output any logs
-     */
-    WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
-      byte region[] = entry.getKey().getEncodedRegionName();
-      String regionName = Bytes.toString(region);
-      WriterAndPath ret = (WriterAndPath) writers.get(regionName);
-      if (ret != null) {
-        return ret;
-      }
-      // If we already decided that this region doesn't get any output
-      // we don't need to check again.
-      if (blacklistedRegions.contains(region)) {
-        return null;
-      }
-      ret = createWAP(region, entry);
-      if (ret == null) {
-        blacklistedRegions.add(region);
-        return null;
-      }
-      if(reusable) {
-        writers.put(regionName, ret);
-      }
-      return ret;
-    }
-
-    /**
-     * @return a path with a write for that path. caller should close.
-     */
-    WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
-      String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-      Path regionedits = getRegionSplitEditsPath(entry,
-          fileBeingSplit.getPath().getName(), tmpDirName, conf);
-      if (regionedits == null) {
-        return null;
-      }
-      FileSystem walFs = FSUtils.getWALFileSystem(conf);
-      if (walFs.exists(regionedits)) {
-        LOG.warn("Found old edits file. It could be the "
-            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
-            + walFs.getFileStatus(regionedits).getLen());
-        if (!walFs.delete(regionedits, false)) {
-          LOG.warn("Failed delete of old {}", regionedits);
-        }
-      }
-      Writer w = createWriter(regionedits);
-      LOG.debug("Creating writer path={}", regionedits);
-      return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
-    }
-
-    void filterCellByStore(Entry logEntry) {
-      Map<byte[], Long> maxSeqIdInStores =
-          regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
-      if (MapUtils.isEmpty(maxSeqIdInStores)) {
-        return;
-      }
-      // Create the array list for the cells that aren't filtered.
-      // We make the assumption that most cells will be kept.
-      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
-      for (Cell cell : logEntry.getEdit().getCells()) {
-        if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
-          keptCells.add(cell);
-        } else {
-          byte[] family = CellUtil.cloneFamily(cell);
-          Long maxSeqId = maxSeqIdInStores.get(family);
-          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
-          // or the master was crashed before and we can not get the information.
-          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
-            keptCells.add(cell);
-          }
-        }
-      }
-
-      // Anything in the keptCells array list is still live.
-      // So rather than removing the cells from the array list
-      // which would be an O(n^2) operation, we just replace the list
-      logEntry.getEdit().setCells(keptCells);
-    }
-
-    @Override
-    public void append(RegionEntryBuffer buffer) throws IOException {
-      appendBuffer(buffer, true);
-    }
-
-    WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
-      List<Entry> entries = buffer.entryBuffer;
-      if (entries.isEmpty()) {
-        LOG.warn("got an empty buffer, skipping");
-        return null;
-      }
-
-      WriterAndPath wap = null;
-
-      long startTime = System.nanoTime();
-      try {
-        int editsCount = 0;
-
-        for (Entry logEntry : entries) {
-          if (wap == null) {
-            wap = getWriterAndPath(logEntry, reusable);
-            if (wap == null) {
-              if (LOG.isTraceEnabled()) {
-                // This log spews the full edit. Can be massive in the log. Enable only debugging
-                // WAL lost edit issues.
-                LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
-              }
-              return null;
-            }
-          }
-          filterCellByStore(logEntry);
-          if (!logEntry.getEdit().isEmpty()) {
-            wap.w.append(logEntry);
-            this.updateRegionMaximumEditLogSeqNum(logEntry);
-            editsCount++;
-          } else {
-            wap.incrementSkippedEdits(1);
-          }
-        }
-        // Pass along summary statistics
-        wap.incrementEdits(editsCount);
-        wap.incrementNanoTime(System.nanoTime() - startTime);
-      } catch (IOException e) {
-          e = e instanceof RemoteException ?
-                  ((RemoteException)e).unwrapRemoteException() : e;
-        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
-        throw e;
-      }
-      return wap;
-    }
-
-    @Override
-    public boolean keepRegionEvent(Entry entry) {
-      ArrayList<Cell> cells = entry.getEdit().getCells();
-      for (Cell cell : cells) {
-        if (WALEdit.isCompactionMarker(cell)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /**
-     * @return a map from encoded region ID to the number of edits written out for that region.
-     */
-    @Override
-    public Map<byte[], Long> getOutputCounts() {
-      TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
-        ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
-      }
-      return ret;
-    }
-
-    @Override
-    public int getNumberOfRecoveredRegions() {
-      return writers.size();
-    }
-  }
-
-  /**
-   *
-   */
-  class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
-
-    private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
-
-    public BoundedLogWriterCreationOutputSink(PipelineController controller,
-        EntryBuffers entryBuffers, int numWriters) {
-      super(controller, entryBuffers, numWriters);
-    }
-
-    @Override
-    public List<Path> finishWritingAndClose() throws IOException {
-      boolean isSuccessful;
-      List<Path> result;
-      try {
-        isSuccessful = finishWriting(false);
-      } finally {
-        result = close();
-      }
-      if (isSuccessful) {
-        splits = result;
-      }
-      return splits;
-    }
-
-    @Override
-    boolean executeCloseTask(CompletionService<Void> completionService,
-        List<IOException> thrown, List<Path> paths)
-        throws InterruptedException, ExecutionException {
-      for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
-        LOG.info("Submitting writeThenClose of {}",
-            Arrays.toString(buffer.getValue().encodedRegionName));
-        completionService.submit(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            Path dst = writeThenClose(buffer.getValue());
-            paths.add(dst);
-            return null;
-          }
-        });
-      }
-      boolean progress_failed = false;
-      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
-        Future<Void> future = completionService.take();
-        future.get();
-        if (!progress_failed && reporter != null && !reporter.progress()) {
-          progress_failed = true;
-        }
-      }
-
-      return progress_failed;
-    }
-
-    /**
-     * since the splitting process may create multiple output files, we need a map
-     * regionRecoverStatMap to track the output count of each region.
-     * @return a map from encoded region ID to the number of edits written out for that region.
-     */
-    @Override
-    public Map<byte[], Long> getOutputCounts() {
-      Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
-      for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
-        regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
-      }
-      return regionRecoverStatMapResult;
-    }
-
-    /**
-     * @return the number of recovered regions
-     */
-    @Override
-    public int getNumberOfRecoveredRegions() {
-      return regionRecoverStatMap.size();
-    }
-
-    /**
-     * Append the buffer to a new recovered edits file, then close it after all done
-     * @param buffer contain all entries of a certain region
-     * @throws IOException when closeWriter failed
-     */
-    @Override
-    public void append(RegionEntryBuffer buffer) throws IOException {
-      writeThenClose(buffer);
-    }
-
-    private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
-      WriterAndPath wap = appendBuffer(buffer, false);
-      if(wap != null) {
-        String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
-        Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
-        if (value != null) {
-          Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
-          regionRecoverStatMap.put(encodedRegionName, newValue);
-        }
-      }
-
-      Path dst = null;
-      List<IOException> thrown = new ArrayList<>();
-      if(wap != null){
-        dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
-      }
-      if (!thrown.isEmpty()) {
-        throw MultipleIOException.createIOException(thrown);
-      }
-      return dst;
-    }
-  }
-
   /**
    * Class wraps the actual writer which writes data out and related statistics
    */
@@ -1771,14 +551,14 @@ public class WALSplitter {
    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
    * data written to this output.
    */
-  private final static class WriterAndPath extends SinkWriter {
-    final Path p;
-    final Writer w;
+  final static class WriterAndPath extends SinkWriter {
+    final Path path;
+    final Writer writer;
     final long minLogSeqNum;
 
-    WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
-      this.p = p;
-      this.w = w;
+    WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) {
+      this.path = path;
+      this.writer = writer;
       this.minLogSeqNum = minLogSeqNum;
     }
   }
@@ -1790,125 +570,4 @@ public class WALSplitter {
       super(s);
     }
   }
-
-  /** A struct used by getMutationsFromWALEntry */
-  public static class MutationReplay implements Comparable<MutationReplay> {
-    public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
-      this.type = type;
-      this.mutation = mutation;
-      if(this.mutation.getDurability() != Durability.SKIP_WAL) {
-        // using ASYNC_WAL for relay
-        this.mutation.setDurability(Durability.ASYNC_WAL);
-      }
-      this.nonceGroup = nonceGroup;
-      this.nonce = nonce;
-    }
-
-    public final MutationType type;
-    public final Mutation mutation;
-    public final long nonceGroup;
-    public final long nonce;
-
-    @Override
-    public int compareTo(final MutationReplay d) {
-      return this.mutation.compareTo(d.mutation);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if(!(obj instanceof MutationReplay)) {
-        return false;
-      } else {
-        return this.compareTo((MutationReplay)obj) == 0;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return this.mutation.hashCode();
-    }
-  }
-
-  /**
-   * This function is used to construct mutations from a WALEntry. It also
-   * reconstructs WALKey &amp; WALEdit from the passed in WALEntry
-   * @param entry
-   * @param cells
-   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
-   *          extracted from the passed in WALEntry.
-   * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
-   * @throws IOException
-   */
-  public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
-      Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
-    if (entry == null) {
-      // return an empty array
-      return Collections.emptyList();
-    }
-
-    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
-      entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
-    int count = entry.getAssociatedCellCount();
-    List<MutationReplay> mutations = new ArrayList<>();
-    Cell previousCell = null;
-    Mutation m = null;
-    WALKeyImpl key = null;
-    WALEdit val = null;
-    if (logEntry != null) {
-      val = new WALEdit();
-    }
-
-    for (int i = 0; i < count; i++) {
-      // Throw index out of bounds if our cell count is off
-      if (!cells.advance()) {
-        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
-      }
-      Cell cell = cells.current();
-      if (val != null) val.add(cell);
-
-      boolean isNewRowOrType =
-          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
-              || !CellUtil.matchingRows(previousCell, cell);
-      if (isNewRowOrType) {
-        // Create new mutation
-        if (CellUtil.isDelete(cell)) {
-          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-          // Deletes don't have nonces.
-          mutations.add(new MutationReplay(
-              MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
-        } else {
-          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-          // Puts might come from increment or append, thus we need nonces.
-          long nonceGroup = entry.getKey().hasNonceGroup()
-              ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
-          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
-          mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
-        }
-      }
-      if (CellUtil.isDelete(cell)) {
-        ((Delete) m).add(cell);
-      } else {
-        ((Put) m).add(cell);
-      }
-      m.setDurability(durability);
-      previousCell = cell;
-    }
-
-    // reconstruct WALKey
-    if (logEntry != null) {
-      org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
-          entry.getKey();
-      List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
-      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
-        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
-      }
-      key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
-              walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
-              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
-      logEntry.setFirst(key);
-      logEntry.setSecond(val);
-    }
-
-    return mutations;
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 084dd89..8087ae8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.After;
@@ -226,14 +226,14 @@ public abstract class AbstractTestDLS {
       for (RegionInfo hri : regions) {
         Path tdir = FSUtils.getWALTableDir(conf, table);
         @SuppressWarnings("deprecation")
-        Path editsdir = WALSplitter
+        Path editsdir = WALSplitUtil
             .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
                 tableName, hri.getEncodedName()));
         LOG.debug("checking edits dir " + editsdir);
         FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
           @Override
           public boolean accept(Path p) {
-            if (WALSplitter.isSequenceIdFile(p)) {
+            if (WALSplitUtil.isSequenceIdFile(p)) {
               return false;
             }
             return true;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java
index d5ec62d..0e90246 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -163,7 +163,7 @@ public class TestDeleteColumnFamilyProcedureFromClient {
         FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
           @Override
           public boolean accept(Path p) {
-            if (WALSplitter.isSequenceIdFile(p)) {
+            if (WALSplitUtil.isSequenceIdFile(p)) {
               return false;
             }
             return true;
@@ -244,7 +244,7 @@ public class TestDeleteColumnFamilyProcedureFromClient {
         FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
           @Override
           public boolean accept(Path p) {
-            if (WALSplitter.isSequenceIdFile(p)) {
+            if (WALSplitUtil.isSequenceIdFile(p)) {
               return false;
             }
             return true;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index c09f702..514d0d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -161,7 +161,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -687,7 +687,7 @@ public class TestHRegion {
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
 
-      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
       long maxSeqId = 1050;
       long minSeqId = 1000;
@@ -738,7 +738,7 @@ public class TestHRegion {
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
 
-      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
       long maxSeqId = 1050;
       long minSeqId = 1000;
@@ -791,7 +791,7 @@ public class TestHRegion {
     Path regiondir = region.getRegionFileSystem().getRegionDir();
     FileSystem fs = region.getRegionFileSystem().getFileSystem();
 
-    Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
     for (int i = 1000; i < 1050; i += 10) {
       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
       FSDataOutputStream dos = fs.create(recoveredEdits);
@@ -824,7 +824,7 @@ public class TestHRegion {
 
       assertEquals(0, region.getStoreFileList(columns).size());
 
-      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
       long maxSeqId = 1050;
       long minSeqId = 1000;
@@ -940,7 +940,7 @@ public class TestHRegion {
       WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
           this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
 
-      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
       fs.create(recoveredEdits);
@@ -1065,7 +1065,7 @@ public class TestHRegion {
 
         // now write those markers to the recovered edits again.
 
-        Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
         fs.create(recoveredEdits);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 98119db..ae4154f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.After;
 import org.junit.AfterClass;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index 4a32820..c3f6d04 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -149,7 +149,7 @@ public class TestRecoveredEdits {
     assertTrue(storeFiles.isEmpty());
     region.close();
     Path regionDir = region.getRegionDir(hbaseRootDir, hri);
-    Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
+    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir);
     // This is a little fragile getting this path to a file of 10M of edits.
     Path recoveredEditsFile = new Path(
       System.getProperty("test.build.classes", "target/test-classes"),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
index e55f3ee..d9f7766 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -146,7 +146,7 @@ public class TestRecoveredEditsReplayAndAbort {
       FileSystem fs = region.getRegionFileSystem().getFileSystem();
       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
 
-      Path recoveredEditsDir = WALSplitter
+      Path recoveredEditsDir = WALSplitUtil
           .getRegionDirRecoveredEditsDir(regiondir);
       long maxSeqId = 1200;
       long minSeqId = 1000;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 7663313..16c5837 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.junit.After;
@@ -902,15 +903,12 @@ public abstract class AbstractTestWALReplay {
     assertTrue(listStatus.length > 0);
     WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
         this.fs, this.conf, null, null, null, wals);
-    FileStatus[] listStatus1 = this.fs.listStatus(
-      new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
-          "recovered.edits")), new PathFilter() {
+    FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName),
+        new Path(hri.getEncodedName(), "recovered.edits")),
+      new PathFilter() {
         @Override
         public boolean accept(Path p) {
-          if (WALSplitter.isSequenceIdFile(p)) {
-            return false;
-          }
-          return true;
+          return !WALSplitUtil.isSequenceIdFile(p);
         }
       });
     int editCount = 0;
@@ -956,7 +954,7 @@ public abstract class AbstractTestWALReplay {
     runWALSplit(this.conf);
 
     // here we let the DFSInputStream throw an IOException just after the WALHeader.
-    Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
+    Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
     FSDataInputStream stream = fs.open(editFile);
     stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
     Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
index acc1f55..3a21405 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -167,13 +167,13 @@ public class TestRestoreSnapshotHelper {
       region.initialize();
       Path recoveredEdit =
           FSUtils.getWALRegionDir(conf, tableName, region.getRegionInfo().getEncodedName());
-      long maxSeqId = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
+      long maxSeqId = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit);
 
       // open restored region without set restored flag
       HRegion region2 = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
         conf, restoredRegion, htd, null);
       region2.initialize();
-      long maxSeqId2 = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
+      long maxSeqId2 = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit);
       Assert.assertTrue(maxSeqId2 > maxSeqId);
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
index 8ae638c..6386112 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
@@ -66,30 +66,30 @@ public class TestReadWriteSeqIdFiles {
 
   @Test
   public void test() throws IOException {
-    WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
-    assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
-    WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
-    assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
+    WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
+    assertEquals(1000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR));
+    WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
+    assertEquals(2000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR));
     // can not write a sequence id which is smaller
     try {
-      WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
+      WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
     } catch (IOException e) {
       // expected
       LOG.info("Expected error", e);
     }
 
-    Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
+    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(REGION_DIR);
     FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
       @Override
       public boolean accept(Path p) {
-        return WALSplitter.isSequenceIdFile(p);
+        return WALSplitUtil.isSequenceIdFile(p);
       }
     });
     // only one seqid file should exist
     assertEquals(1, files.length);
 
     // verify all seqId files aren't treated as recovered.edits files
-    NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR);
+    NavigableSet<Path> recoveredEdits = WALSplitUtil.getSplitEditFilesSorted(walFS, REGION_DIR);
     assertEquals(0, recoveredEdits.size());
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index b20b3a5..741d449 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
 import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
 import org.junit.ClassRule;
@@ -89,17 +88,17 @@ public class TestWALMethods {
     Path regiondir = util.getDataTestDir("regiondir");
     fs.delete(regiondir, true);
     fs.mkdirs(regiondir);
-    Path recoverededits = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
-    String first = WALSplitter.formatRecoveredEditsFileName(-1);
+    Path recoverededits = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
+    String first = WALSplitUtil.formatRecoveredEditsFileName(-1);
     createFile(fs, recoverededits, first);
-    createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(0));
-    createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(1));
-    createFile(fs, recoverededits, WALSplitter
+    createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(0));
+    createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(1));
+    createFile(fs, recoverededits, WALSplitUtil
         .formatRecoveredEditsFileName(11));
-    createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(2));
-    createFile(fs, recoverededits, WALSplitter
+    createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(2));
+    createFile(fs, recoverededits, WALSplitUtil
         .formatRecoveredEditsFileName(50));
-    String last = WALSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
+    String last = WALSplitUtil.formatRecoveredEditsFileName(Long.MAX_VALUE);
     createFile(fs, recoverededits, last);
     createFile(fs, recoverededits,
       Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
@@ -108,21 +107,21 @@ public class TestWALMethods {
     FSUtils.setRootDir(walConf, regiondir);
     (new WALFactory(walConf, "dummyLogName")).getWAL(null);
 
-    NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+    NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
     assertEquals(7, files.size());
     assertEquals(files.pollFirst().getName(), first);
     assertEquals(files.pollLast().getName(), last);
     assertEquals(files.pollFirst().getName(),
-      WALSplitter
+        WALSplitUtil
         .formatRecoveredEditsFileName(0));
     assertEquals(files.pollFirst().getName(),
-      WALSplitter
+        WALSplitUtil
         .formatRecoveredEditsFileName(1));
     assertEquals(files.pollFirst().getName(),
-      WALSplitter
+        WALSplitUtil
         .formatRecoveredEditsFileName(2));
     assertEquals(files.pollFirst().getName(),
-      WALSplitter
+        WALSplitUtil
         .formatRecoveredEditsFileName(11));
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index b4e2533..696ddea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -399,7 +399,7 @@ public class TestWALSplit {
     Path regiondir = new Path(tdir,
       RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
     fs.mkdirs(regiondir);
-    Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+    Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
     assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
     fs.createNewFile(parent); // create a recovered.edits file
     String parentOfParent = p.getParent().getParent().getName();
@@ -414,7 +414,7 @@ public class TestWALSplit {
       new Entry(new WALKeyImpl(encoded,
         TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
         new WALEdit());
-    Path p = WALSplitter.getRegionSplitEditsPath(entry,
+    Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
       FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
     return p;
   }
@@ -422,10 +422,10 @@ public class TestWALSplit {
   @Test
   public void testHasRecoveredEdits() throws IOException {
     Path p = createRecoveredEditsPathForRegion();
-    assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
+    assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
     String renamedEdit = p.getName().split("-")[0];
     fs.createNewFile(new Path(p.getParent(), renamedEdit));
-    assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
+    assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
   }
 
   private void useDifferentDFSClient() throws IOException {
@@ -1157,7 +1157,7 @@ public class TestWALSplit {
         // After creating writer, simulate region's
         // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
         // region and delete them, excluding files with '.temp' suffix.
-        NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+        NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
         if (files != null && !files.isEmpty()) {
           for (Path file : files) {
             if (!this.walFS.delete(file, false)) {
@@ -1243,12 +1243,12 @@ public class TestWALSplit {
       throws IOException {
     Path tdir = FSUtils.getWALTableDir(conf, table);
     @SuppressWarnings("deprecation")
-    Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+    Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
         Bytes.toString(Bytes.toBytes(region))));
     FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
       @Override
       public boolean accept(Path p) {
-        if (WALSplitter.isSequenceIdFile(p)) {
+        if (WALSplitUtil.isSequenceIdFile(p)) {
           return false;
         }
         return true;