You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2018/01/02 20:12:31 UTC

hbase git commit: HBASE-19358 Improve the stability of splitting log when do fail over

Repository: hbase
Updated Branches:
  refs/heads/master 4e9f4abb1 -> f6f57d38f


HBASE-19358 Improve the stability of splitting log when do fail over

Signed-off-by: Yu Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f6f57d38
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f6f57d38
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f6f57d38

Branch: refs/heads/master
Commit: f6f57d38f750c34e2353f605f815c019c8ba3f88
Parents: 4e9f4ab
Author: Jingyun Tian <ti...@gmail.com>
Authored: Tue Jan 2 19:17:34 2018 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Jan 3 04:11:40 2018 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 400 +++++++++++++------
 .../TestWALReplayBoundedLogWriterCreation.java  |  35 ++
 .../TestWALSplitBoundedLogWriterCreation.java   |  44 ++
 3 files changed, 358 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f6f57d38/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 817e925..328390e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -18,11 +18,6 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -30,6 +25,7 @@ import java.io.InterruptedIOException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -68,6 +64,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.SplitLogManager;
@@ -77,12 +74,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -99,6 +90,16 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
  * files that are no longer being written to, into new files, one per region, for
@@ -138,6 +139,12 @@ public class WALSplitter {
   // the file being split currently
   private FileStatus fileBeingSplit;
 
+  // if we limit the number of writers opened for sinking recovered edits
+  private final boolean splitWriterCreationBounded;
+
+  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
+
+
   @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
       FileSystem fs, LastSequenceId idChecker,
@@ -154,11 +161,19 @@ public class WALSplitter {
     this.walFactory = factory;
     PipelineController controller = new PipelineController();
 
+    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+
     entryBuffers = new EntryBuffers(controller,
-        this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024));
+        this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
+        splitWriterCreationBounded);
 
     int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-    outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+    if(splitWriterCreationBounded){
+      outputSink = new BoundedLogWriterCreationOutputSink(
+          controller, entryBuffers, numWriterThreads);
+    }else {
+      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+    }
   }
 
   /**
@@ -840,10 +855,17 @@ public class WALSplitter {
 
     long totalBuffered = 0;
     long maxHeapUsage;
+    boolean splitWriterCreationBounded;
 
     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+      this(controller, maxHeapUsage, false);
+    }
+
+    public EntryBuffers(PipelineController controller, long maxHeapUsage,
+        boolean splitWriterCreationBounded){
       this.controller = controller;
       this.maxHeapUsage = maxHeapUsage;
+      this.splitWriterCreationBounded = splitWriterCreationBounded;
     }
 
     /**
@@ -884,6 +906,13 @@ public class WALSplitter {
      * @return RegionEntryBuffer a buffer of edits to be written.
      */
     synchronized RegionEntryBuffer getChunkToWrite() {
+      // The core part of limiting opening writers is it doesn't return chunk only if the
+      // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
+      // region during splitting. It will flush all the logs in the buffer after splitting
+      // through a threadpool, which means the number of writers it created is under control.
+      if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
+        return null;
+      }
       long biggestSize = 0;
       byte[] biggestBufferKey = null;
 
@@ -1062,11 +1091,10 @@ public class WALSplitter {
     protected PipelineController controller;
     protected EntryBuffers entryBuffers;
 
-    protected Map<byte[], SinkWriter> writers = Collections
-        .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
+    protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
+    protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
+        new ConcurrentHashMap<>();
 
-    protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
-        .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
 
     protected final List<WriterThread> writerThreads = Lists.newArrayList();
 
@@ -1113,11 +1141,10 @@ public class WALSplitter {
      */
     void updateRegionMaximumEditLogSeqNum(Entry entry) {
       synchronized (regionMaximumEditLogSeqNum) {
-        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
-            .getEncodedRegionName());
+        String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
         if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
-          regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
-              .getSequenceId());
+          regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
         }
       }
     }
@@ -1277,87 +1304,24 @@ public class WALSplitter {
      * Close all of the output streams.
      * @return the list of paths written.
      */
-    private List<Path> close() throws IOException {
+    List<Path> close() throws IOException {
       Preconditions.checkState(!closeAndCleanCompleted);
 
       final List<Path> paths = new ArrayList<>();
       final List<IOException> thrown = Lists.newArrayList();
-      ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
-        TimeUnit.SECONDS, new ThreadFactory() {
-          private int count = 1;
-
-          @Override
-          public Thread newThread(Runnable r) {
-            Thread t = new Thread(r, "split-log-closeStream-" + count++);
-            return t;
-          }
-        });
-      CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
-      for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
-        }
-        completionService.submit(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
-            if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
-            try {
-              wap.w.close();
-            } catch (IOException ioe) {
-              LOG.error("Couldn't close log at " + wap.p, ioe);
-              thrown.add(ioe);
-              return null;
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
-                + " edits, skipped " + wap.editsSkipped + " edits in "
-                + (wap.nanosSpent / 1000 / 1000) + "ms");
-            }
-            if (wap.editsWritten == 0) {
-              // just remove the empty recovered.edits file
-              if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
-                LOG.warn("Failed deleting empty " + wap.p);
-                throw new IOException("Failed deleting empty  " + wap.p);
-              }
-              return null;
-            }
+      ThreadPoolExecutor closeThreadPool = Threads
+          .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
+            private int count = 1;
 
-            Path dst = getCompletedRecoveredEditsFilePath(wap.p,
-              regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
-            try {
-              if (!dst.equals(wap.p) && fs.exists(dst)) {
-                deleteOneWithFewerEntries(wap, dst);
-              }
-              // Skip the unit tests which create a splitter that reads and
-              // writes the data without touching disk.
-              // TestHLogSplit#testThreading is an example.
-              if (fs.exists(wap.p)) {
-                if (!fs.rename(wap.p, dst)) {
-                  throw new IOException("Failed renaming " + wap.p + " to " + dst);
-                }
-                LOG.info("Rename " + wap.p + " to " + dst);
-              }
-            } catch (IOException ioe) {
-              LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
-              thrown.add(ioe);
-              return null;
+            @Override public Thread newThread(Runnable r) {
+              Thread t = new Thread(r, "split-log-closeStream-" + count++);
+              return t;
             }
-            paths.add(dst);
-            return null;
-          }
-        });
-      }
-
-      boolean progress_failed = false;
+          });
+      CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
+      boolean progress_failed;
       try {
-        for (int i = 0, n = this.writers.size(); i < n; i++) {
-          Future<Void> future = completionService.take();
-          future.get();
-          if (!progress_failed && reporter != null && !reporter.progress()) {
-            progress_failed = true;
-          }
-        }
+        progress_failed = executeCloseTask(completionService, thrown, paths);
       } catch (InterruptedException e) {
         IOException iie = new InterruptedIOException();
         iie.initCause(e);
@@ -1367,7 +1331,6 @@ public class WALSplitter {
       } finally {
         closeThreadPool.shutdownNow();
       }
-
       if (!thrown.isEmpty()) {
         throw MultipleIOException.createIOException(thrown);
       }
@@ -1379,6 +1342,88 @@ public class WALSplitter {
       return paths;
     }
 
+    /**
+     * @param completionService threadPool to execute the closing tasks
+     * @param thrown store the exceptions
+     * @param paths arrayList to store the paths written
+     * @return if close tasks executed successful
+     */
+    boolean executeCloseTask(CompletionService<Void> completionService,
+        List<IOException> thrown, List<Path> paths)
+        throws InterruptedException, ExecutionException {
+      for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
+        }
+        completionService.submit(new Callable<Void>() {
+          @Override public Void call() throws Exception {
+            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
+            Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
+            paths.add(dst);
+            return null;
+          }
+        });
+      }
+      boolean progress_failed = false;
+      for (int i = 0, n = this.writers.size(); i < n; i++) {
+        Future<Void> future = completionService.take();
+        future.get();
+        if (!progress_failed && reporter != null && !reporter.progress()) {
+          progress_failed = true;
+        }
+      }
+      return progress_failed;
+    }
+
+    Path closeWriter(String encodedRegionName, WriterAndPath wap,
+        List<IOException> thrown) throws IOException{
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Closing " + wap.p);
+      }
+      try {
+        wap.w.close();
+      } catch (IOException ioe) {
+        LOG.error("Couldn't close log at " + wap.p, ioe);
+        thrown.add(ioe);
+        return null;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
+            + " edits, skipped " + wap.editsSkipped + " edits in "
+            + (wap.nanosSpent / 1000 / 1000) + "ms");
+      }
+      if (wap.editsWritten == 0) {
+        // just remove the empty recovered.edits file
+        if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+          LOG.warn("Failed deleting empty " + wap.p);
+          throw new IOException("Failed deleting empty  " + wap.p);
+        }
+        return null;
+      }
+
+      Path dst = getCompletedRecoveredEditsFilePath(wap.p,
+          regionMaximumEditLogSeqNum.get(encodedRegionName));
+      try {
+        if (!dst.equals(wap.p) && fs.exists(dst)) {
+          deleteOneWithFewerEntries(wap, dst);
+        }
+        // Skip the unit tests which create a splitter that reads and
+        // writes the data without touching disk.
+        // TestHLogSplit#testThreading is an example.
+        if (fs.exists(wap.p)) {
+          if (!fs.rename(wap.p, dst)) {
+            throw new IOException("Failed renaming " + wap.p + " to " + dst);
+          }
+          LOG.info("Rename " + wap.p + " to " + dst);
+        }
+      } catch (IOException ioe) {
+        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+        thrown.add(ioe);
+        return null;
+      }
+      return dst;
+    }
+
     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
       if (writersClosed) {
         return thrown;
@@ -1402,20 +1447,19 @@ public class WALSplitter {
           }
         }
       } finally {
-        synchronized (writers) {
-          WriterAndPath wap = null;
-          for (SinkWriter tmpWAP : writers.values()) {
-            try {
-              wap = (WriterAndPath) tmpWAP;
-              wap.w.close();
-            } catch (IOException ioe) {
-              LOG.error("Couldn't close log at " + wap.p, ioe);
-              thrown.add(ioe);
-              continue;
-            }
-            LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
-                + (wap.nanosSpent / 1000 / 1000) + "ms)");
+        WriterAndPath wap = null;
+        for (SinkWriter tmpWAP : writers.values()) {
+          try {
+            wap = (WriterAndPath) tmpWAP;
+            wap.w.close();
+          } catch (IOException ioe) {
+            LOG.error("Couldn't close log at " + wap.p, ioe);
+            thrown.add(ioe);
+            continue;
           }
+          LOG.info(
+              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
+                  / 1000 / 1000) + "ms)");
         }
         writersClosed = true;
       }
@@ -1428,9 +1472,10 @@ public class WALSplitter {
      * long as multiple threads are always acting on different regions.
      * @return null if this region shouldn't output any logs
      */
-    private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+    WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
       byte region[] = entry.getKey().getEncodedRegionName();
-      WriterAndPath ret = (WriterAndPath) writers.get(region);
+      String regionName = Bytes.toString(region);
+      WriterAndPath ret = (WriterAndPath) writers.get(regionName);
       if (ret != null) {
         return ret;
       }
@@ -1444,14 +1489,16 @@ public class WALSplitter {
         blacklistedRegions.add(region);
         return null;
       }
-      writers.put(region, ret);
+      if(reusable) {
+        writers.put(regionName, ret);
+      }
       return ret;
     }
 
     /**
      * @return a path with a write for that path. caller should close.
      */
-    private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
+    WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
       if (regionedits == null) {
         return null;
@@ -1469,7 +1516,7 @@ public class WALSplitter {
       return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
     }
 
-    private void filterCellByStore(Entry logEntry) {
+    void filterCellByStore(Entry logEntry) {
       Map<byte[], Long> maxSeqIdInStores =
           regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
       if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
@@ -1500,10 +1547,14 @@ public class WALSplitter {
 
     @Override
     public void append(RegionEntryBuffer buffer) throws IOException {
+      appendBuffer(buffer, true);
+    }
+
+    WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
         LOG.warn("got an empty buffer, skipping");
-        return;
+        return null;
       }
 
       WriterAndPath wap = null;
@@ -1514,14 +1565,14 @@ public class WALSplitter {
 
         for (Entry logEntry : entries) {
           if (wap == null) {
-            wap = getWriterAndPath(logEntry);
+            wap = getWriterAndPath(logEntry, reusable);
             if (wap == null) {
               if (LOG.isTraceEnabled()) {
                 // This log spews the full edit. Can be massive in the log. Enable only debugging
                 // WAL lost edit issues.
                 LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
               }
-              return;
+              return null;
             }
           }
           filterCellByStore(logEntry);
@@ -1542,6 +1593,7 @@ public class WALSplitter {
         LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e);
         throw e;
       }
+      return wap;
     }
 
     @Override
@@ -1561,10 +1613,8 @@ public class WALSplitter {
     @Override
     public Map<byte[], Long> getOutputCounts() {
       TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      synchronized (writers) {
-        for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
-          ret.put(entry.getKey(), entry.getValue().editsWritten);
-        }
+      for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
+        ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
       }
       return ret;
     }
@@ -1576,6 +1626,114 @@ public class WALSplitter {
   }
 
   /**
+   *
+   */
+  class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
+
+    private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
+
+    public BoundedLogWriterCreationOutputSink(PipelineController controller,
+        EntryBuffers entryBuffers, int numWriters) {
+      super(controller, entryBuffers, numWriters);
+    }
+
+    @Override
+    public List<Path> finishWritingAndClose() throws IOException {
+      boolean isSuccessful;
+      List<Path> result;
+      try {
+        isSuccessful = finishWriting(false);
+      } finally {
+        result = close();
+      }
+      if (isSuccessful) {
+        splits = result;
+      }
+      return splits;
+    }
+
+    @Override
+    boolean executeCloseTask(CompletionService<Void> completionService,
+        List<IOException> thrown, List<Path> paths)
+        throws InterruptedException, ExecutionException {
+      for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
+        LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName);
+        completionService.submit(new Callable<Void>() {
+          public Void call() throws Exception {
+            Path dst = writeThenClose(buffer.getValue());
+            paths.add(dst);
+            return null;
+          }
+        });
+      }
+      boolean progress_failed = false;
+      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+        Future<Void> future = completionService.take();
+        future.get();
+        if (!progress_failed && reporter != null && !reporter.progress()) {
+          progress_failed = true;
+        }
+      }
+
+      return progress_failed;
+    }
+
+    /**
+     * since the splitting process may create multiple output files, we need a map
+     * regionRecoverStatMap to track the output count of each region.
+     * @return a map from encoded region ID to the number of edits written out for that region.
+     */
+    @Override
+    public Map<byte[], Long> getOutputCounts() {
+      Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
+      for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
+        regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
+      }
+      return regionRecoverStatMapResult;
+    }
+
+    /**
+     * @return the number of recovered regions
+     */
+    @Override
+    public int getNumberOfRecoveredRegions() {
+      return regionRecoverStatMap.size();
+    }
+
+    /**
+     * Append the buffer to a new recovered edits file, then close it after all done
+     * @param buffer contain all entries of a certain region
+     * @throws IOException when closeWriter failed
+     */
+    @Override
+    public void append(RegionEntryBuffer buffer) throws IOException {
+      writeThenClose(buffer);
+    }
+
+    private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
+      WriterAndPath wap = appendBuffer(buffer, false);
+      if(wap != null) {
+        String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
+        Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
+        if (value != null) {
+          Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
+          regionRecoverStatMap.put(encodedRegionName, newValue);
+        }
+      }
+
+      Path dst = null;
+      List<IOException> thrown = new ArrayList<>();
+      if(wap != null){
+        dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
+      }
+      if (!thrown.isEmpty()) {
+        throw MultipleIOException.createIOException(thrown);
+      }
+      return dst;
+    }
+  }
+
+  /**
    * Class wraps the actual writer which writes data out and related statistics
    */
   public abstract static class SinkWriter {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6f57d38/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
new file mode 100644
index 0000000..55bbeaf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+
+@Category(MediumTests.class)
+public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestWALReplay.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/f6f57d38/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
new file mode 100644
index 0000000..844cb3a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestWALSplit.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
+  }
+
+  /**
+   * The logic of this test has conflict with the limit writers split logic, skip this test
+   */
+  @Test(timeout=300000)
+  @Ignore
+  public void testThreadingSlowWriterSmallBuffer() throws Exception {
+    super.testThreadingSlowWriterSmallBuffer();
+  }
+}
+