You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/12/02 10:45:10 UTC

[hbase] branch branch-2 updated: HBASE-23298 Refactor LogRecoveredEditsOutputSink and BoundedLogWriterCreationOutputSink (#832)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 71c913f  HBASE-23298 Refactor LogRecoveredEditsOutputSink and BoundedLogWriterCreationOutputSink (#832)
71c913f is described below

commit 71c913f4da3eff52a5d0c9980e3b463c61c5e46a
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Dec 2 13:30:47 2019 +0800

    HBASE-23298 Refactor LogRecoveredEditsOutputSink and BoundedLogWriterCreationOutputSink (#832)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../RegionReplicaReplicationEndpoint.java          |  27 +-
 .../wal/AbstractRecoveredEditsOutputSink.java      | 270 ++++++++++++
 .../hadoop/hbase/wal/BoundedEntryBuffers.java      |  44 ++
 .../wal/BoundedLogWriterCreationOutputSink.java    | 150 -------
 .../hbase/wal/BoundedRecoveredEditsOutputSink.java | 141 +++++++
 .../org/apache/hadoop/hbase/wal/EntryBuffers.java  |  88 +++-
 .../hbase/wal/LogRecoveredEditsOutputSink.java     | 460 ---------------------
 .../org/apache/hadoop/hbase/wal/OutputSink.java    | 120 +++---
 .../hadoop/hbase/wal/RecoveredEditsOutputSink.java | 155 +++++++
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  |  18 +-
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   | 149 ++-----
 .../apache/hadoop/hbase/wal/TestWALMethods.java    |   7 +-
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |  12 +-
 13 files changed, 785 insertions(+), 856 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index d1498a8..707057f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -60,11 +60,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.EntryBuffers;
+import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
 import org.apache.hadoop.hbase.wal.OutputSink;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -158,7 +157,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
   protected void doStop() {
     if (outputSink != null) {
       try {
-        outputSink.finishWritingAndClose();
+        outputSink.close();
       } catch (IOException ex) {
         LOG.warn("Got exception while trying to close OutputSink", ex);
       }
@@ -295,7 +294,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
 
     @Override
     public void append(RegionEntryBuffer buffer) throws IOException {
-      List<Entry> entries = buffer.getEntryBuffer();
+      List<Entry> entries = buffer.getEntries();
 
       if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
         return;
@@ -311,12 +310,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
     }
 
-    @Override
-    public boolean flush() throws IOException {
+    void flush() throws IOException {
       // nothing much to do for now. Wait for the Writer threads to finish up
       // append()'ing the data.
       entryBuffers.waitUntilDrained();
-      return super.flush();
     }
 
     @Override
@@ -325,8 +322,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     }
 
     @Override
-    public List<Path> finishWritingAndClose() throws IOException {
-      finishWriting(true);
+    public List<Path> close() throws IOException {
+      finishWriterThreads(true);
       return null;
     }
 
@@ -341,7 +338,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     }
 
     AtomicLong getSkippedEditsCounter() {
-      return skippedEdits;
+      return totalSkippedEdits;
     }
 
     /**
@@ -377,13 +374,19 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
             skipEdits++;
           }
         }
-        skippedEdits.addAndGet(skipEdits);
+        totalSkippedEdits.addAndGet(skipEdits);
       }
       return requiresReplication;
     }
+
+    @Override
+    protected int getNumOpenWriters() {
+      // TODO Auto-generated method stub
+      return 0;
+    }
   }
 
-  static class RegionReplicaSinkWriter extends SinkWriter {
+  static class RegionReplicaSinkWriter {
     RegionReplicaOutputSink sink;
     ClusterConnection connection;
     RpcControllerFactory rpcControllerFactory;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
new file mode 100644
index 0000000..da952eb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
+
+@InterfaceAudience.Private
+abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
+  private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
+  private final WALSplitter walSplitter;
+  private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>();
+
+  public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
+      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+    super(controller, entryBuffers, numWriters);
+    this.walSplitter = walSplitter;
+  }
+
+  /**
+   * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
+   */
+  protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
+    long seqId) throws IOException {
+    Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
+      walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
+      walSplitter.conf);
+    if (walSplitter.walFS.exists(regionEditsPath)) {
+      LOG.warn("Found old edits file. It could be the " +
+        "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" +
+        walSplitter.walFS.getFileStatus(regionEditsPath).getLen());
+      if (!walSplitter.walFS.delete(regionEditsPath, false)) {
+        LOG.warn("Failed delete of old {}", regionEditsPath);
+      }
+    }
+    WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
+    LOG.info("Creating recovered edits writer path={}", regionEditsPath);
+    return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
+  }
+
+  protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
+    List<IOException> thrown) throws IOException {
+    try {
+      editsWriter.writer.close();
+    } catch (IOException ioe) {
+      LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe);
+      thrown.add(ioe);
+      return null;
+    }
+    LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms",
+      editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped,
+      editsWriter.nanosSpent / 1000 / 1000);
+    if (editsWriter.editsWritten == 0) {
+      // just remove the empty recovered.edits file
+      if (walSplitter.walFS.exists(editsWriter.path) &&
+        !walSplitter.walFS.delete(editsWriter.path, false)) {
+        LOG.warn("Failed deleting empty {}", editsWriter.path);
+        throw new IOException("Failed deleting empty  " + editsWriter.path);
+      }
+      return null;
+    }
+
+    Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
+      regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
+    try {
+      if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
+        deleteOneWithFewerEntries(editsWriter, dst);
+      }
+      // Skip the unit tests which create a splitter that reads and
+      // writes the data without touching disk.
+      // TestHLogSplit#testThreading is an example.
+      if (walSplitter.walFS.exists(editsWriter.path)) {
+        if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
+          throw new IOException(
+            "Failed renaming recovered edits " + editsWriter.path + " to " + dst);
+        }
+        LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst);
+      }
+    } catch (IOException ioe) {
+      LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe);
+      thrown.add(ioe);
+      return null;
+    }
+    return dst;
+  }
+
+  @Override
+  public boolean keepRegionEvent(WAL.Entry entry) {
+    ArrayList<Cell> cells = entry.getEdit().getCells();
+    for (Cell cell : cells) {
+      if (WALEdit.isCompactionMarker(cell)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Update region's maximum edit log SeqNum.
+   */
+  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
+    synchronized (regionMaximumEditLogSeqNum) {
+      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
+      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
+        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
+      }
+    }
+  }
+
+  // delete the one with fewer wal entries
+  private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
+    throws IOException {
+    long dstMinLogSeqNum = -1L;
+    try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+      WAL.Entry entry = reader.next();
+      if (entry != null) {
+        dstMinLogSeqNum = entry.getKey().getSequenceId();
+      }
+    } catch (EOFException e) {
+      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
+        e);
+    }
+    if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
+      LOG.warn("Found existing old edits file. It could be the result of a previous failed" +
+        " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" +
+        walSplitter.walFS.getFileStatus(dst).getLen());
+      if (!walSplitter.walFS.delete(dst, false)) {
+        LOG.warn("Failed deleting of old {}", dst);
+        throw new IOException("Failed deleting of old " + dst);
+      }
+    } else {
+      LOG.warn(
+        "Found existing old edits file and we have less entries. Deleting " + editsWriter.path +
+          ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
+      if (!walSplitter.walFS.delete(editsWriter.path, false)) {
+        LOG.warn("Failed deleting of {}", editsWriter.path);
+        throw new IOException("Failed deleting of " + editsWriter.path);
+      }
+    }
+  }
+
+  /**
+   * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
+   * statistics about the data written to this output.
+   */
+  final class RecoveredEditsWriter {
+    /* Count of edits written to this path */
+    long editsWritten = 0;
+    /* Count of edits skipped to this path */
+    long editsSkipped = 0;
+    /* Number of nanos spent writing to this log */
+    long nanosSpent = 0;
+
+    final byte[] encodedRegionName;
+    final Path path;
+    final WALProvider.Writer writer;
+    final long minLogSeqNum;
+
+    RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer,
+      long minLogSeqNum) {
+      this.encodedRegionName = encodedRegionName;
+      this.path = path;
+      this.writer = writer;
+      this.minLogSeqNum = minLogSeqNum;
+    }
+
+    private void incrementEdits(int edits) {
+      editsWritten += edits;
+    }
+
+    private void incrementSkippedEdits(int skipped) {
+      editsSkipped += skipped;
+      totalSkippedEdits.addAndGet(skipped);
+    }
+
+    private void incrementNanoTime(long nanos) {
+      nanosSpent += nanos;
+    }
+
+    void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
+      long startTime = System.nanoTime();
+      try {
+        int editsCount = 0;
+        for (WAL.Entry logEntry : entries) {
+          filterCellByStore(logEntry);
+          if (!logEntry.getEdit().isEmpty()) {
+            writer.append(logEntry);
+            updateRegionMaximumEditLogSeqNum(logEntry);
+            editsCount++;
+          } else {
+            incrementSkippedEdits(1);
+          }
+        }
+        // Pass along summary statistics
+        incrementEdits(editsCount);
+        incrementNanoTime(System.nanoTime() - startTime);
+      } catch (IOException e) {
+        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
+        LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
+        throw e;
+      }
+    }
+
+    private void filterCellByStore(WAL.Entry logEntry) {
+      Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
+          .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+      if (MapUtils.isEmpty(maxSeqIdInStores)) {
+        return;
+      }
+      // Create the array list for the cells that aren't filtered.
+      // We make the assumption that most cells will be kept.
+      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
+      for (Cell cell : logEntry.getEdit().getCells()) {
+        if (WALEdit.isMetaEditFamily(cell)) {
+          keptCells.add(cell);
+        } else {
+          byte[] family = CellUtil.cloneFamily(cell);
+          Long maxSeqId = maxSeqIdInStores.get(family);
+          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
+          // or the master was crashed before and we can not get the information.
+          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
+            keptCells.add(cell);
+          }
+        }
+      }
+
+      // Anything in the keptCells array list is still live.
+      // So rather than removing the cells from the array list
+      // which would be an O(n^2) operation, we just replace the list
+      logEntry.getEdit().setCells(keptCells);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java
new file mode 100644
index 0000000..ed3c8b7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used for {@link BoundedRecoveredEditsOutputSink}. The core part of limiting opening writers is it
+ * doesn't return chunk only if the heap size is over maxHeapUsage. Thus it doesn't need to create
+ * a writer for each region during splitting. The returned {@link EntryBuffers.RegionEntryBuffer}
+ * will be write to recovered edits file and close the writer immediately.
+ * See {@link BoundedRecoveredEditsOutputSink#append(EntryBuffers.RegionEntryBuffer)} for more
+ * details.
+ */
+@InterfaceAudience.Private
+public class BoundedEntryBuffers extends EntryBuffers {
+
+  public BoundedEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) {
+    super(controller, maxHeapUsage);
+  }
+
+  @Override
+  synchronized RegionEntryBuffer getChunkToWrite() {
+    if (totalBuffered < maxHeapUsage) {
+      return null;
+    }
+    return super.getChunkToWrite();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
deleted file mode 100644
index 77b8f93..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class that manages the output streams from the log splitting process.
- * Bounded means the output streams will be no more than the size of threadpool
- */
-@InterfaceAudience.Private
-public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
-
-  private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
-
-  public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
-      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
-    super(walSplitter, controller, entryBuffers, numWriters);
-  }
-
-  @Override
-  public List<Path> finishWritingAndClose() throws IOException {
-    boolean isSuccessful;
-    List<Path> result;
-    try {
-      isSuccessful = finishWriting(false);
-    } finally {
-      result = close();
-    }
-    if (isSuccessful) {
-      splits = result;
-    }
-    return splits;
-  }
-
-  @Override
-  boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
-      List<Path> paths) throws InterruptedException, ExecutionException {
-    for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
-        .entrySet()) {
-      LOG.info("Submitting writeThenClose of {}",
-          Bytes.toString(buffer.getValue().encodedRegionName));
-      completionService.submit(new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          Path dst = writeThenClose(buffer.getValue());
-          paths.add(dst);
-          return null;
-        }
-      });
-    }
-    boolean progress_failed = false;
-    for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
-      Future<Void> future = completionService.take();
-      future.get();
-      if (!progress_failed && reporter != null && !reporter.progress()) {
-        progress_failed = true;
-      }
-    }
-
-    return progress_failed;
-  }
-
-  /**
-   * since the splitting process may create multiple output files, we need a map
-   * regionRecoverStatMap to track the output count of each region.
-   * @return a map from encoded region ID to the number of edits written out for that region.
-   */
-  @Override
-  public Map<byte[], Long> getOutputCounts() {
-    Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
-    for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
-      regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
-    }
-    return regionRecoverStatMapResult;
-  }
-
-  /**
-   * @return the number of recovered regions
-   */
-  @Override
-  public int getNumberOfRecoveredRegions() {
-    return regionRecoverStatMap.size();
-  }
-
-  /**
-   * Append the buffer to a new recovered edits file, then close it after all done
-   * @param buffer contain all entries of a certain region
-   * @throws IOException when closeWriter failed
-   */
-  @Override
-  public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
-    writeThenClose(buffer);
-  }
-
-  private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
-    WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
-    if (wap != null) {
-      String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
-      Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
-      if (value != null) {
-        Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
-        regionRecoverStatMap.put(encodedRegionName, newValue);
-      }
-    }
-
-    Path dst = null;
-    List<IOException> thrown = new ArrayList<>();
-    if (wap != null) {
-      dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
-    }
-    if (!thrown.isEmpty()) {
-      throw MultipleIOException.createIOException(thrown);
-    }
-    return dst;
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
new file mode 100644
index 0000000..a258205
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ * Every region may have many recovered edits file. But the opening writers is bounded.
+ * Bounded means the output streams will be no more than the size of threadpool.
+ */
+@InterfaceAudience.Private
+class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BoundedRecoveredEditsOutputSink.class);
+
+  // Since the splitting process may create multiple output files, we need a map
+  // to track the output count of each region.
+  private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
+  // Need a counter to track the opening writers.
+  private final AtomicInteger openingWritersNum = new AtomicInteger(0);
+
+  public BoundedRecoveredEditsOutputSink(WALSplitter walSplitter,
+      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+    super(walSplitter, controller, entryBuffers, numWriters);
+  }
+
+  @Override
+  public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
+    List<WAL.Entry> entries = buffer.entries;
+    if (entries.isEmpty()) {
+      LOG.warn("got an empty buffer, skipping");
+      return;
+    }
+    // The key point is create a new writer, write edits then close writer.
+    RecoveredEditsWriter writer =
+      createRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
+        entries.get(0).getKey().getSequenceId());
+    if (writer != null) {
+      openingWritersNum.incrementAndGet();
+      writer.writeRegionEntries(entries);
+      regionEditsWrittenMap.compute(buffer.encodedRegionName,
+        (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
+      List<IOException> thrown = new ArrayList<>();
+      Path dst = closeRecoveredEditsWriter(writer, thrown);
+      splits.add(dst);
+      openingWritersNum.decrementAndGet();
+      if (!thrown.isEmpty()) {
+        throw MultipleIOException.createIOException(thrown);
+      }
+    }
+  }
+
+  @Override
+  public List<Path> close() throws IOException {
+    boolean isSuccessful = true;
+    try {
+      isSuccessful &= finishWriterThreads(false);
+    } finally {
+      isSuccessful &= writeRemainingEntryBuffers();
+    }
+    return isSuccessful ? splits : null;
+  }
+
+  /**
+   * Write out the remaining RegionEntryBuffers and close the writers.
+   *
+   * @return true when there is no error.
+   */
+  private boolean writeRemainingEntryBuffers() throws IOException {
+    for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) {
+      closeCompletionService.submit(() -> {
+        append(buffer);
+        return null;
+      });
+    }
+    boolean progressFailed = false;
+    try {
+      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+        Future<Void> future = closeCompletionService.take();
+        future.get();
+        if (!progressFailed && reporter != null && !reporter.progress()) {
+          progressFailed = true;
+        }
+      }
+    } catch (InterruptedException e) {
+      IOException iie = new InterruptedIOException();
+      iie.initCause(e);
+      throw iie;
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      closeThreadPool.shutdownNow();
+    }
+    return !progressFailed;
+  }
+
+  @Override
+  public Map<byte[], Long> getOutputCounts() {
+    return regionEditsWrittenMap;
+  }
+
+  @Override
+  public int getNumberOfRecoveredRegions() {
+    return regionEditsWrittenMap.size();
+  }
+
+  @Override
+  public int getNumOpenWriters() {
+    return openingWritersNum.get();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
index f0974be..6a2661c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
@@ -18,19 +18,24 @@
 package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 /**
  * Class which accumulates edits and separates them into a buffer per region while simultaneously
  * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
@@ -40,29 +45,22 @@ import org.slf4j.LoggerFactory;
 public class EntryBuffers {
   private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
 
-  PipelineController controller;
+  private final PipelineController controller;
 
-  Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+  final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
   /*
    * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
    * up bytes from a region if we're already writing data for that region in a different IO thread.
    */
-  Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+  private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
 
-  long totalBuffered = 0;
-  long maxHeapUsage;
-  boolean splitWriterCreationBounded;
+  protected long totalBuffered = 0;
+  protected final long maxHeapUsage;
 
   public EntryBuffers(PipelineController controller, long maxHeapUsage) {
-    this(controller, maxHeapUsage, false);
-  }
-
-  public EntryBuffers(PipelineController controller, long maxHeapUsage,
-      boolean splitWriterCreationBounded) {
     this.controller = controller;
     this.maxHeapUsage = maxHeapUsage;
-    this.splitWriterCreationBounded = splitWriterCreationBounded;
   }
 
   /**
@@ -98,13 +96,6 @@ public class EntryBuffers {
    * @return RegionEntryBuffer a buffer of edits to be written.
    */
   synchronized RegionEntryBuffer getChunkToWrite() {
-    // The core part of limiting opening writers is it doesn't return chunk only if the
-    // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
-    // region during splitting. It will flush all the logs in the buffer after splitting
-    // through a threadpool, which means the number of writers it created is under control.
-    if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
-      return null;
-    }
     long biggestSize = 0;
     byte[] biggestBufferKey = null;
 
@@ -138,6 +129,7 @@ public class EntryBuffers {
     }
   }
 
+  @VisibleForTesting
   synchronized boolean isRegionCurrentlyWriting(byte[] region) {
     return currentlyWriting.contains(region);
   }
@@ -155,4 +147,56 @@ public class EntryBuffers {
       }
     }
   }
-}
\ No newline at end of file
+
+  /**
+   * A buffer of some number of edits for a given region.
+   * This accumulates edits and also provides a memory optimization in order to
+   * share a single byte array instance for the table and region name.
+   * Also tracks memory usage of the accumulated edits.
+   */
+  public static class RegionEntryBuffer implements HeapSize {
+    private long heapInBuffer = 0;
+    final List<WAL.Entry> entries;
+    final TableName tableName;
+    final byte[] encodedRegionName;
+
+    RegionEntryBuffer(TableName tableName, byte[] region) {
+      this.tableName = tableName;
+      this.encodedRegionName = region;
+      this.entries = new ArrayList<>();
+    }
+
+    long appendEntry(WAL.Entry entry) {
+      internify(entry);
+      entries.add(entry);
+      // TODO linkedlist entry
+      long incrHeap = entry.getEdit().heapSize() +
+          ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers
+      heapInBuffer += incrHeap;
+      return incrHeap;
+    }
+
+    private void internify(WAL.Entry entry) {
+      WALKeyImpl k = entry.getKey();
+      k.internTableName(this.tableName);
+      k.internEncodedRegionName(this.encodedRegionName);
+    }
+
+    @Override
+    public long heapSize() {
+      return heapInBuffer;
+    }
+
+    public byte[] getEncodedRegionName() {
+      return encodedRegionName;
+    }
+
+    public TableName getTableName() {
+      return tableName;
+    }
+
+    public List<WAL.Entry> getEntries() {
+      return this.entries;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
deleted file mode 100644
index 9fc43b1..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
-import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
-
-/**
- * Class that manages the output streams from the log splitting process.
- */
-@InterfaceAudience.Private
-public class LogRecoveredEditsOutputSink extends OutputSink {
-  private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
-  private WALSplitter walSplitter;
-  private FileSystem walFS;
-  private Configuration conf;
-
-  public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
-      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
-    // More threads could potentially write faster at the expense
-    // of causing more disk seeks as the logs are split.
-    // 3. After a certain setting (probably around 3) the
-    // process will be bound on the reader in the current
-    // implementation anyway.
-    super(controller, entryBuffers, numWriters);
-    this.walSplitter = walSplitter;
-    this.walFS = walSplitter.walFS;
-    this.conf = walSplitter.conf;
-  }
-
-  /**
-   * @return null if failed to report progress
-   */
-  @Override
-  public List<Path> finishWritingAndClose() throws IOException {
-    boolean isSuccessful = false;
-    List<Path> result = null;
-    try {
-      isSuccessful = finishWriting(false);
-    } finally {
-      result = close();
-      List<IOException> thrown = closeLogWriters(null);
-      if (CollectionUtils.isNotEmpty(thrown)) {
-        throw MultipleIOException.createIOException(thrown);
-      }
-    }
-    if (isSuccessful) {
-      splits = result;
-    }
-    return splits;
-  }
-
-  // delete the one with fewer wal entries
-  private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
-      throws IOException {
-    long dstMinLogSeqNum = -1L;
-    try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
-      WAL.Entry entry = reader.next();
-      if (entry != null) {
-        dstMinLogSeqNum = entry.getKey().getSequenceId();
-      }
-    } catch (EOFException e) {
-      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
-        e);
-    }
-    if (wap.minLogSeqNum < dstMinLogSeqNum) {
-      LOG.warn("Found existing old edits file. It could be the result of a previous failed"
-          + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
-          + walFS.getFileStatus(dst).getLen());
-      if (!walFS.delete(dst, false)) {
-        LOG.warn("Failed deleting of old {}", dst);
-        throw new IOException("Failed deleting of old " + dst);
-      }
-    } else {
-      LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
-          + ", length=" + walFS.getFileStatus(wap.path).getLen());
-      if (!walFS.delete(wap.path, false)) {
-        LOG.warn("Failed deleting of {}", wap.path);
-        throw new IOException("Failed deleting of " + wap.path);
-      }
-    }
-  }
-
-  /**
-   * Close all of the output streams.
-   * @return the list of paths written.
-   */
-  List<Path> close() throws IOException {
-    Preconditions.checkState(!closeAndCleanCompleted);
-
-    final List<Path> paths = new ArrayList<>();
-    final List<IOException> thrown = Lists.newArrayList();
-    ThreadPoolExecutor closeThreadPool =
-        Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
-          private int count = 1;
-
-          @Override
-          public Thread newThread(Runnable r) {
-            Thread t = new Thread(r, "split-log-closeStream-" + count++);
-            return t;
-          }
-        });
-    CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
-    boolean progress_failed;
-    try {
-      progress_failed = executeCloseTask(completionService, thrown, paths);
-    } catch (InterruptedException e) {
-      IOException iie = new InterruptedIOException();
-      iie.initCause(e);
-      throw iie;
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    } finally {
-      closeThreadPool.shutdownNow();
-    }
-    if (!thrown.isEmpty()) {
-      throw MultipleIOException.createIOException(thrown);
-    }
-    writersClosed = true;
-    closeAndCleanCompleted = true;
-    if (progress_failed) {
-      return null;
-    }
-    return paths;
-  }
-
-  /**
-   * @param completionService threadPool to execute the closing tasks
-   * @param thrown store the exceptions
-   * @param paths arrayList to store the paths written
-   * @return if close tasks executed successful
-   */
-  boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
-      List<Path> paths) throws InterruptedException, ExecutionException {
-    for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
-      }
-      completionService.submit(new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
-          Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
-          paths.add(dst);
-          return null;
-        }
-      });
-    }
-    boolean progress_failed = false;
-    for (int i = 0, n = this.writers.size(); i < n; i++) {
-      Future<Void> future = completionService.take();
-      future.get();
-      if (!progress_failed && reporter != null && !reporter.progress()) {
-        progress_failed = true;
-      }
-    }
-    return progress_failed;
-  }
-
-  Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
-      List<IOException> thrown) throws IOException {
-    LOG.trace("Closing {}", wap.path);
-    try {
-      wap.writer.close();
-    } catch (IOException ioe) {
-      LOG.error("Could not close log at {}", wap.path, ioe);
-      thrown.add(ioe);
-      return null;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
-          + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
-    }
-    if (wap.editsWritten == 0) {
-      // just remove the empty recovered.edits file
-      if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
-        LOG.warn("Failed deleting empty {}", wap.path);
-        throw new IOException("Failed deleting empty  " + wap.path);
-      }
-      return null;
-    }
-
-    Path dst = getCompletedRecoveredEditsFilePath(wap.path,
-      regionMaximumEditLogSeqNum.get(encodedRegionName));
-    try {
-      if (!dst.equals(wap.path) && walFS.exists(dst)) {
-        deleteOneWithFewerEntries(wap, dst);
-      }
-      // Skip the unit tests which create a splitter that reads and
-      // writes the data without touching disk.
-      // TestHLogSplit#testThreading is an example.
-      if (walFS.exists(wap.path)) {
-        if (!walFS.rename(wap.path, dst)) {
-          throw new IOException("Failed renaming " + wap.path + " to " + dst);
-        }
-        LOG.info("Rename {} to {}", wap.path, dst);
-      }
-    } catch (IOException ioe) {
-      LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
-      thrown.add(ioe);
-      return null;
-    }
-    return dst;
-  }
-
-  private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
-    if (writersClosed) {
-      return thrown;
-    }
-    if (thrown == null) {
-      thrown = Lists.newArrayList();
-    }
-    try {
-      for (WriterThread writerThread : writerThreads) {
-        while (writerThread.isAlive()) {
-          writerThread.setShouldStop(true);
-          writerThread.interrupt();
-          try {
-            writerThread.join(10);
-          } catch (InterruptedException e) {
-            IOException iie = new InterruptedIOException();
-            iie.initCause(e);
-            throw iie;
-          }
-        }
-      }
-    } finally {
-      WALSplitter.WriterAndPath wap = null;
-      for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
-        try {
-          wap = (WALSplitter.WriterAndPath) tmpWAP;
-          wap.writer.close();
-        } catch (IOException ioe) {
-          LOG.error("Couldn't close log at {}", wap.path, ioe);
-          thrown.add(ioe);
-          continue;
-        }
-        LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
-            + (wap.nanosSpent / 1000 / 1000) + "ms)");
-      }
-      writersClosed = true;
-    }
-
-    return thrown;
-  }
-
-  /**
-   * Get a writer and path for a log starting at the given entry. This function is threadsafe so
-   * long as multiple threads are always acting on different regions.
-   * @return null if this region shouldn't output any logs
-   */
-  WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
-    byte[] region = entry.getKey().getEncodedRegionName();
-    String regionName = Bytes.toString(region);
-    WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
-    if (ret != null) {
-      return ret;
-    }
-    // If we already decided that this region doesn't get any output
-    // we don't need to check again.
-    if (blacklistedRegions.contains(region)) {
-      return null;
-    }
-    ret = createWAP(region, entry);
-    if (ret == null) {
-      blacklistedRegions.add(region);
-      return null;
-    }
-    if (reusable) {
-      writers.put(regionName, ret);
-    }
-    return ret;
-  }
-
-  /**
-   * @return a path with a write for that path. caller should close.
-   */
-  WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
-    String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
-      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-    Path regionedits = getRegionSplitEditsPath(entry,
-      walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
-    if (regionedits == null) {
-      return null;
-    }
-    FileSystem walFs = FSUtils.getWALFileSystem(conf);
-    if (walFs.exists(regionedits)) {
-      LOG.warn("Found old edits file. It could be the "
-          + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
-          + walFs.getFileStatus(regionedits).getLen());
-      if (!walFs.delete(regionedits, false)) {
-        LOG.warn("Failed delete of old {}", regionedits);
-      }
-    }
-    WALProvider.Writer w = walSplitter.createWriter(regionedits);
-    LOG.debug("Creating writer path={}", regionedits);
-    return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
-  }
-
-
-
-  void filterCellByStore(WAL.Entry logEntry) {
-    Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
-        .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
-    if (MapUtils.isEmpty(maxSeqIdInStores)) {
-      return;
-    }
-    // Create the array list for the cells that aren't filtered.
-    // We make the assumption that most cells will be kept.
-    ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
-    for (Cell cell : logEntry.getEdit().getCells()) {
-      if (WALEdit.isMetaEditFamily(cell)) {
-        keptCells.add(cell);
-      } else {
-        byte[] family = CellUtil.cloneFamily(cell);
-        Long maxSeqId = maxSeqIdInStores.get(family);
-        // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
-        // or the master was crashed before and we can not get the information.
-        if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
-          keptCells.add(cell);
-        }
-      }
-    }
-
-    // Anything in the keptCells array list is still live.
-    // So rather than removing the cells from the array list
-    // which would be an O(n^2) operation, we just replace the list
-    logEntry.getEdit().setCells(keptCells);
-  }
-
-  @Override
-  public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
-    appendBuffer(buffer, true);
-  }
-
-  WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
-      throws IOException {
-    List<WAL.Entry> entries = buffer.entryBuffer;
-    if (entries.isEmpty()) {
-      LOG.warn("got an empty buffer, skipping");
-      return null;
-    }
-
-    WALSplitter.WriterAndPath wap = null;
-
-    long startTime = System.nanoTime();
-    try {
-      int editsCount = 0;
-
-      for (WAL.Entry logEntry : entries) {
-        if (wap == null) {
-          wap = getWriterAndPath(logEntry, reusable);
-          if (wap == null) {
-            // This log spews the full edit. Can be massive in the log. Enable only debugging
-            // WAL lost edit issues.
-            LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
-            return null;
-          }
-        }
-        filterCellByStore(logEntry);
-        if (!logEntry.getEdit().isEmpty()) {
-          wap.writer.append(logEntry);
-          this.updateRegionMaximumEditLogSeqNum(logEntry);
-          editsCount++;
-        } else {
-          wap.incrementSkippedEdits(1);
-        }
-      }
-      // Pass along summary statistics
-      wap.incrementEdits(editsCount);
-      wap.incrementNanoTime(System.nanoTime() - startTime);
-    } catch (IOException e) {
-      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
-      LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
-      throw e;
-    }
-    return wap;
-  }
-
-  @Override
-  public boolean keepRegionEvent(WAL.Entry entry) {
-    ArrayList<Cell> cells = entry.getEdit().getCells();
-    for (Cell cell : cells) {
-      if (WALEdit.isCompactionMarker(cell)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * @return a map from encoded region ID to the number of edits written out for that region.
-   */
-  @Override
-  public Map<byte[], Long> getOutputCounts() {
-    TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
-      ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
-    }
-    return ret;
-  }
-
-  @Override
-  public int getNumberOfRecoveredRegions() {
-    return writers.size();
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index 729ea8b..b60b889 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -19,17 +19,18 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,36 +45,33 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 public abstract class OutputSink {
   private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
 
-  protected WALSplitter.PipelineController controller;
-  protected EntryBuffers entryBuffers;
+  private final WALSplitter.PipelineController controller;
+  protected final EntryBuffers entryBuffers;
 
-  protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
-  protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
-      new ConcurrentHashMap<>();
-
-  protected final List<WriterThread> writerThreads = Lists.newArrayList();
-
-  /* Set of regions which we've decided should not output edits */
-  protected final Set<byte[]> blacklistedRegions =
-      Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
-
-  protected boolean closeAndCleanCompleted = false;
-
-  protected boolean writersClosed = false;
+  private final List<WriterThread> writerThreads = Lists.newArrayList();
 
   protected final int numThreads;
 
   protected CancelableProgressable reporter = null;
 
-  protected AtomicLong skippedEdits = new AtomicLong();
+  protected final AtomicLong totalSkippedEdits = new AtomicLong();
 
-  protected List<Path> splits = null;
+  protected final List<Path> splits = new ArrayList<>();
+
+  /**
+   * Used when close this output sink.
+   */
+  protected final ThreadPoolExecutor closeThreadPool;
+  protected final CompletionService<Void> closeCompletionService;
 
   public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
       int numWriters) {
-    numThreads = numWriters;
+    this.numThreads = numWriters;
     this.controller = controller;
     this.entryBuffers = entryBuffers;
+    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
+        Threads.newDaemonThreadFactory("split-log-closeStream-"));
+    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
   }
 
   void setReporter(CancelableProgressable reporter) {
@@ -92,36 +90,13 @@ public abstract class OutputSink {
   }
 
   /**
-   * Update region's maximum edit log SeqNum.
-   */
-  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
-    synchronized (regionMaximumEditLogSeqNum) {
-      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
-      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
-      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
-        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
-      }
-    }
-  }
-
-  /**
-   * @return the number of currently opened writers
-   */
-  int getNumOpenWriters() {
-    return this.writers.size();
-  }
-
-  long getSkippedEdits() {
-    return this.skippedEdits.get();
-  }
-
-  /**
    * Wait for writer threads to dump all info to the sink
+   *
    * @return true when there is no error
    */
-  protected boolean finishWriting(boolean interrupt) throws IOException {
+  protected boolean finishWriterThreads(boolean interrupt) throws IOException {
     LOG.debug("Waiting for split writer threads to finish");
-    boolean progress_failed = false;
+    boolean progressFailed = false;
     for (WriterThread t : writerThreads) {
       t.finish();
     }
@@ -132,8 +107,8 @@ public abstract class OutputSink {
     }
 
     for (WriterThread t : writerThreads) {
-      if (!progress_failed && reporter != null && !reporter.progress()) {
-        progress_failed = true;
+      if (!progressFailed && reporter != null && !reporter.progress()) {
+        progressFailed = true;
       }
       try {
         t.join();
@@ -144,41 +119,42 @@ public abstract class OutputSink {
       }
     }
     controller.checkForErrors();
-    LOG.info("{} split writers finished; closing.", this.writerThreads.size());
-    return (!progress_failed);
+    LOG.info("{} split writer threads finished", this.writerThreads.size());
+    return (!progressFailed);
   }
 
-  public abstract List<Path> finishWritingAndClose() throws IOException;
+  long getTotalSkippedEdits() {
+    return this.totalSkippedEdits.get();
+  }
 
   /**
-   * @return a map from encoded region ID to the number of edits written out for that region.
+   * @return the number of currently opened writers
    */
-  public abstract Map<byte[], Long> getOutputCounts();
+  protected abstract int getNumOpenWriters();
 
   /**
-   * @return number of regions we've recovered
+   * @param buffer A buffer of some number of edits for a given region.
    */
-  public abstract int getNumberOfRecoveredRegions();
+  protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
+
+  protected abstract List<Path> close() throws IOException;
 
   /**
-   * @param buffer A WAL Edit Entry
+   * @return a map from encoded region ID to the number of edits written out for that region.
    */
-  public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
+  protected abstract Map<byte[], Long> getOutputCounts();
 
   /**
-   * WriterThread call this function to help flush internal remaining edits in buffer before close
-   * @return true when underlying sink has something to flush
+   * @return number of regions we've recovered
    */
-  public boolean flush() throws IOException {
-    return false;
-  }
+  protected abstract int getNumberOfRecoveredRegions();
 
   /**
    * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
    * want to get all of those edits.
    * @return Return true if this sink wants to accept this region-level WALEdit.
    */
-  public abstract boolean keepRegionEvent(WAL.Entry entry);
+  protected abstract boolean keepRegionEvent(WAL.Entry entry);
 
   public static class WriterThread extends Thread {
     private volatile boolean shouldStop = false;
@@ -207,11 +183,11 @@ public abstract class OutputSink {
     private void doRun() throws IOException {
       LOG.trace("Writer thread starting");
       while (true) {
-        WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
         if (buffer == null) {
           // No data currently available, wait on some more to show up
           synchronized (controller.dataAvailable) {
-            if (shouldStop && !this.outputSink.flush()) {
+            if (shouldStop) {
               return;
             }
             try {
@@ -234,15 +210,11 @@ public abstract class OutputSink {
       }
     }
 
-    private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
       outputSink.append(buffer);
     }
 
-    void setShouldStop(boolean shouldStop) {
-      this.shouldStop = shouldStop;
-    }
-
-    void finish() {
+    private void finish() {
       synchronized (controller.dataAvailable) {
         shouldStop = true;
         controller.dataAvailable.notifyAll();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
new file mode 100644
index 0000000..67b6e67
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ * Every region only has one recovered edits.
+ */
+@InterfaceAudience.Private
+class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
+  private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
+  private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>();
+
+  public RecoveredEditsOutputSink(WALSplitter walSplitter,
+      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+    super(walSplitter, controller, entryBuffers, numWriters);
+  }
+
+  @Override
+  public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
+    List<WAL.Entry> entries = buffer.entries;
+    if (entries.isEmpty()) {
+      LOG.warn("got an empty buffer, skipping");
+      return;
+    }
+    RecoveredEditsWriter writer =
+      getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
+        entries.get(0).getKey().getSequenceId());
+    if (writer != null) {
+      writer.writeRegionEntries(entries);
+    }
+  }
+
+  /**
+   * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+   * long as multiple threads are always acting on different regions.
+   * @return null if this region shouldn't output any logs
+   */
+  private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
+    long seqId) throws IOException {
+    RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
+    if (ret != null) {
+      return ret;
+    }
+    ret = createRecoveredEditsWriter(tableName, region, seqId);
+    if (ret == null) {
+      return null;
+    }
+    writers.put(Bytes.toString(region), ret);
+    return ret;
+  }
+
+  @Override
+  public List<Path> close() throws IOException {
+    boolean isSuccessful = true;
+    try {
+      isSuccessful &= finishWriterThreads(false);
+    } finally {
+      isSuccessful &= closeWriters();
+    }
+    return isSuccessful ? splits : null;
+  }
+
+  /**
+   * Close all of the output streams.
+   *
+   * @return true when there is no error.
+   */
+  private boolean closeWriters() throws IOException {
+    List<IOException> thrown = Lists.newArrayList();
+    for (RecoveredEditsWriter writer : writers.values()) {
+      closeCompletionService.submit(() -> {
+        Path dst = closeRecoveredEditsWriter(writer, thrown);
+        splits.add(dst);
+        return null;
+      });
+    }
+    boolean progressFailed = false;
+    try {
+      for (int i = 0, n = this.writers.size(); i < n; i++) {
+        Future<Void> future = closeCompletionService.take();
+        future.get();
+        if (!progressFailed && reporter != null && !reporter.progress()) {
+          progressFailed = true;
+        }
+      }
+    } catch (InterruptedException e) {
+      IOException iie = new InterruptedIOException();
+      iie.initCause(e);
+      throw iie;
+    } catch (ExecutionException e) {
+      throw new IOException(e.getCause());
+    } finally {
+      closeThreadPool.shutdownNow();
+    }
+    if (!thrown.isEmpty()) {
+      throw MultipleIOException.createIOException(thrown);
+    }
+    return !progressFailed;
+  }
+
+  @Override
+  public Map<byte[], Long> getOutputCounts() {
+    TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
+      ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+    }
+    return ret;
+  }
+
+  @Override
+  public int getNumberOfRecoveredRegions() {
+    return writers.size();
+  }
+
+  @Override
+  public int getNumOpenWriters() {
+    return writers.size();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index b9ccc49..d68549f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -162,7 +162,9 @@ public final class WALSplitUtil {
    * named for the sequenceid in the passed <code>logEntry</code>: e.g.
    * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
    * RECOVERED_EDITS_DIR under the region creating it if necessary.
-   * @param walEntry walEntry to recover
+   * @param tableName the table name
+   * @param encodedRegionName the encoded region name
+   * @param sedId the sequence id which used to generate file name
    * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
    * @param tmpDirName of the directory used to sideline old recovered edits file
    * @param conf configuration
@@ -171,12 +173,12 @@ public final class WALSplitUtil {
    */
   @SuppressWarnings("deprecation")
   @VisibleForTesting
-  static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
-      String tmpDirName, Configuration conf) throws IOException {
+  static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId,
+      String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
     FileSystem walFS = FSUtils.getWALFileSystem(conf);
-    Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
-    String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
-    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
+    Path tableDir = FSUtils.getWALTableDir(conf, tableName);
+    String encodedRegionNameStr = Bytes.toString(encodedRegionName);
+    Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr);
     Path dir = getRegionDirRecoveredEditsDir(regionDir);
 
     if (walFS.exists(dir) && walFS.isFile(dir)) {
@@ -184,7 +186,7 @@ public final class WALSplitUtil {
       if (!walFS.exists(tmp)) {
         walFS.mkdirs(tmp);
       }
-      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
+      tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr);
       LOG.warn("Found existing old file: {}. It could be some "
           + "leftover of an old installation. It should be a folder instead. "
           + "So moving it to {}",
@@ -200,7 +202,7 @@ public final class WALSplitUtil {
     // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
     // region's replayRecoveredEdits will not delete it
-    String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
+    String fileName = formatRecoveredEditsFileName(sedId);
     fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
     return new Path(dir, fileName);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 4820f20..c60c882 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -49,12 +47,10 @@ import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -106,11 +102,12 @@ public class WALSplitter {
   // the file being split currently
   private FileStatus fileBeingSplit;
 
-  // if we limit the number of writers opened for sinking recovered edits
-  private final boolean splitWriterCreationBounded;
+  private final String tmpDirName;
 
   public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
-
+  public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
+  public final static String SPLIT_WAL_WRITER_THREADS =
+      "hbase.regionserver.hlog.splitlog.writer.threads";
 
   @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
@@ -126,20 +123,21 @@ public class WALSplitter {
 
     this.walFactory = factory;
     PipelineController controller = new PipelineController();
+    this.tmpDirName =
+      conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
 
-    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
-
-    entryBuffers = new EntryBuffers(controller,
-        this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
-        splitWriterCreationBounded);
-
-    int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+    // if we limit the number of writers opened for sinking recovered edits
+    boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+    long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
+    int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
     if (splitWriterCreationBounded) {
+      entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
       outputSink =
-          new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads);
+          new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
     } else {
+      entryBuffers = new EntryBuffers(controller, bufferSize);
       outputSink =
-          new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
+          new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
     }
   }
 
@@ -151,6 +149,10 @@ public class WALSplitter {
     return fileBeingSplit;
   }
 
+  String getTmpDirName() {
+    return this.tmpDirName;
+  }
+
   Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
     return regionMaxSeqIdInStores;
   }
@@ -214,7 +216,7 @@ public class WALSplitter {
     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
     Path logPath = logfile.getPath();
     boolean outputSinkStarted = false;
-    boolean progress_failed = false;
+    boolean progressFailed = false;
     int editsCount = 0;
     int editsSkipped = 0;
 
@@ -229,7 +231,7 @@ public class WALSplitter {
           logLength);
       status.setStatus("Opening log file");
       if (reporter != null && !reporter.progress()) {
-        progress_failed = true;
+        progressFailed = true;
         return false;
       }
       logFileReader = getReader(logfile, skipErrors, reporter);
@@ -287,11 +289,11 @@ public class WALSplitter {
         if (editsCount % interval == 0
             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
           numOpenedFilesLastCheck = this.getNumOpenWriters();
-          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
+          String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
               + " edits, skipped " + editsSkipped + " edits.";
           status.setStatus("Split " + countsStr);
           if (reporter != null && !reporter.progress()) {
-            progress_failed = true;
+            progressFailed = true;
             return false;
           }
         }
@@ -325,9 +327,9 @@ public class WALSplitter {
       try {
         if (outputSinkStarted) {
           // Set progress_failed to true as the immediate following statement will reset its value
-          // when finishWritingAndClose() throws exception, progress_failed has the right value
-          progress_failed = true;
-          progress_failed = outputSink.finishWritingAndClose() == null;
+          // when close() throws exception, progress_failed has the right value
+          progressFailed = true;
+          progressFailed = outputSink.close() == null;
         }
       } finally {
         long processCost = EnvironmentEdgeManager.currentTime() - startTS;
@@ -336,18 +338,18 @@ public class WALSplitter {
             outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
             " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
             StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
-            ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
+            ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
         LOG.info(msg);
         status.markComplete(msg);
       }
     }
-    return !progress_failed;
+    return !progressFailed;
   }
 
   /**
    * Create a new {@link Reader} for reading logs to split.
    */
-  protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
+  private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
       throws IOException, CorruptedLogFileException {
     Path path = file.getPath();
     long length = file.getLen();
@@ -391,7 +393,7 @@ public class WALSplitter {
     return in;
   }
 
-  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+  private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
       throws CorruptedLogFileException, IOException {
     try {
       return in.next();
@@ -474,98 +476,6 @@ public class WALSplitter {
     }
   }
 
-  /**
-   * A buffer of some number of edits for a given region.
-   * This accumulates edits and also provides a memory optimization in order to
-   * share a single byte array instance for the table and region name.
-   * Also tracks memory usage of the accumulated edits.
-   */
-  public static class RegionEntryBuffer implements HeapSize {
-    long heapInBuffer = 0;
-    List<Entry> entryBuffer;
-    TableName tableName;
-    byte[] encodedRegionName;
-
-    RegionEntryBuffer(TableName tableName, byte[] region) {
-      this.tableName = tableName;
-      this.encodedRegionName = region;
-      this.entryBuffer = new ArrayList<>();
-    }
-
-    long appendEntry(Entry entry) {
-      internify(entry);
-      entryBuffer.add(entry);
-      long incrHeap = entry.getEdit().heapSize() +
-        ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
-        0; // TODO linkedlist entry
-      heapInBuffer += incrHeap;
-      return incrHeap;
-    }
-
-    private void internify(Entry entry) {
-      WALKeyImpl k = entry.getKey();
-      k.internTableName(this.tableName);
-      k.internEncodedRegionName(this.encodedRegionName);
-    }
-
-    @Override
-    public long heapSize() {
-      return heapInBuffer;
-    }
-
-    public byte[] getEncodedRegionName() {
-      return encodedRegionName;
-    }
-
-    public List<Entry> getEntryBuffer() {
-      return entryBuffer;
-    }
-
-    public TableName getTableName() {
-      return tableName;
-    }
-  }
-
-  /**
-   * Class wraps the actual writer which writes data out and related statistics
-   */
-  public abstract static class SinkWriter {
-    /* Count of edits written to this path */
-    long editsWritten = 0;
-    /* Count of edits skipped to this path */
-    long editsSkipped = 0;
-    /* Number of nanos spent writing to this log */
-    long nanosSpent = 0;
-
-    void incrementEdits(int edits) {
-      editsWritten += edits;
-    }
-
-    void incrementSkippedEdits(int skipped) {
-      editsSkipped += skipped;
-    }
-
-    void incrementNanoTime(long nanos) {
-      nanosSpent += nanos;
-    }
-  }
-
-  /**
-   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
-   * data written to this output.
-   */
-  final static class WriterAndPath extends SinkWriter {
-    final Path path;
-    final Writer writer;
-    final long minLogSeqNum;
-
-    WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) {
-      this.path = path;
-      this.writer = writer;
-      this.minLogSeqNum = minLogSeqNum;
-    }
-  }
-
   static class CorruptedLogFileException extends Exception {
     private static final long serialVersionUID = 1L;
 
@@ -582,6 +492,5 @@ public class WALSplitter {
     CorruptedLogFileException(String message, Throwable cause) {
       super(message, cause);
     }
-
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index 741d449..4d6600d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -134,7 +133,7 @@ public class TestWALMethods {
 
   @Test
   public void testRegionEntryBuffer() throws Exception {
-    WALSplitter.RegionEntryBuffer reb = new WALSplitter.RegionEntryBuffer(
+    EntryBuffers.RegionEntryBuffer reb = new EntryBuffers.RegionEntryBuffer(
         TEST_TABLE, TEST_REGION);
     assertEquals(0, reb.heapSize());
 
@@ -153,7 +152,7 @@ public class TestWALMethods {
     assertTrue(sink.totalBuffered > 0);
     long amountInChunk = sink.totalBuffered;
     // Get a chunk
-    RegionEntryBuffer chunk = sink.getChunkToWrite();
+    EntryBuffers.RegionEntryBuffer chunk = sink.getChunkToWrite();
     assertEquals(chunk.heapSize(), amountInChunk);
 
     // Make sure it got marked that a thread is "working on this"
@@ -172,7 +171,7 @@ public class TestWALMethods {
     // to get the second
     sink.doneWriting(chunk);
 
-    RegionEntryBuffer chunk2 = sink.getChunkToWrite();
+    EntryBuffers.RegionEntryBuffer chunk2 = sink.getChunkToWrite();
     assertNotNull(chunk2);
     assertNotSame(chunk, chunk2);
     long amountInChunk2 = sink.totalBuffered;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index cfeaac8..8ddd0ea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -407,15 +407,15 @@ public class TestWALSplit {
     WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
   }
 
-  private Path createRecoveredEditsPathForRegion() throws  IOException{
+  private Path createRecoveredEditsPathForRegion() throws IOException {
     byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
     long now = System.currentTimeMillis();
-    Entry entry =
-      new Entry(new WALKeyImpl(encoded,
-        TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
+    Entry entry = new Entry(
+        new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
         new WALEdit());
-    Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
-      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
+    Path p = WALSplitUtil
+        .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
+            TMPDIRNAME, conf);
     return p;
   }