You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/12/14 00:14:40 UTC

[12/50] [abbrv] hbase git commit: HBASE-19358 Improve the stability of splitting log when do fail over

HBASE-19358 Improve the stability of splitting log when do fail over

Signed-off-by: Yu Li <li...@apache.org>

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4f46f53
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4f46f53
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4f46f53

Branch: refs/heads/branch-1.3
Commit: e4f46f53b737d0a5ea0c4c079b6182a5d918e91d
Parents: cd1726f
Author: Jingyun Tian <ti...@gmail.com>
Authored: Tue Jan 2 17:21:32 2018 +0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 12 18:08:17 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 358 +++++++++++++------
 .../TestWALReplayBoundedLogWriterCreation.java  |  33 ++
 .../TestWALSplitBoundedLogWriterCreation.java   |  44 +++
 3 files changed, 330 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e4f46f53/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index cc065e5..1927eb3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -25,6 +25,7 @@ import java.io.InterruptedIOException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -161,6 +162,9 @@ public class WALSplitter {
 
   protected boolean distributedLogReplay;
 
+  private final boolean splitWriterCreationBounded;
+
+
   // Map encodedRegionName -> lastFlushedSequenceId
   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
 
@@ -180,6 +184,8 @@ public class WALSplitter {
   // the file being split currently
   private FileStatus fileBeingSplit;
 
+  public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
+
   @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
       FileSystem fs, LastSequenceId idChecker,
@@ -194,10 +200,10 @@ public class WALSplitter {
     this.csm = (BaseCoordinatedStateManager)csm;
     this.walFactory = factory;
     this.controller = new PipelineController();
-
+    this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
     entryBuffers = new EntryBuffers(controller,
         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
-            128*1024*1024));
+            128*1024*1024), splitWriterCreationBounded);
 
     // a larger minBatchSize may slow down recovery because replay writer has to wait for
     // enough edits before replaying them
@@ -212,7 +218,12 @@ public class WALSplitter {
         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
       }
       this.distributedLogReplay = false;
-      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+      if(splitWriterCreationBounded){
+        outputSink = new BoundedLogWriterCreationOutputSink(controller,
+            entryBuffers, numWriterThreads);
+      }else {
+        outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+      }
     }
 
   }
@@ -923,11 +934,19 @@ public class WALSplitter {
     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 
     long totalBuffered = 0;
-    long maxHeapUsage;
+    final long maxHeapUsage;
+    boolean splitWriterCreationBounded;
+
 
     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+      this(controller, maxHeapUsage, false);
+    }
+
+    public EntryBuffers(PipelineController controller, long maxHeapUsage,
+        boolean splitWriterCreationBounded) {
       this.controller = controller;
       this.maxHeapUsage = maxHeapUsage;
+      this.splitWriterCreationBounded = splitWriterCreationBounded;
     }
 
     /**
@@ -967,6 +986,14 @@ public class WALSplitter {
      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
      */
     synchronized RegionEntryBuffer getChunkToWrite() {
+      // The core part of limiting opening writers is it doesn't return chunk only if the heap size
+      // is over maxHeapUsage. Thus it doesn't need to create a writer for each region
+      // during splitting. It will flush all the logs in the buffer after splitting through a
+      // threadpool, which means the number of writers it created is under control
+      if(splitWriterCreationBounded && totalBuffered < maxHeapUsage){
+        return null;
+      }
+
       long biggestSize = 0;
       byte[] biggestBufferKey = null;
 
@@ -1145,11 +1172,9 @@ public class WALSplitter {
     protected PipelineController controller;
     protected EntryBuffers entryBuffers;
 
-    protected Map<byte[], SinkWriter> writers = Collections
-        .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
-
-    protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
-        .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
+    protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
+    protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
+        new ConcurrentHashMap<>();
 
     protected final List<WriterThread> writerThreads = Lists.newArrayList();
 
@@ -1195,18 +1220,19 @@ public class WALSplitter {
      * Update region's maximum edit log SeqNum.
      */
     void updateRegionMaximumEditLogSeqNum(Entry entry) {
+
       synchronized (regionMaximumEditLogSeqNum) {
-        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
-            .getEncodedRegionName());
+        String encodedRegionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(encodedRegionName);
         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
-          regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
+          regionMaximumEditLogSeqNum.put(encodedRegionName, entry.getKey()
               .getLogSeqNum());
         }
       }
     }
 
     Long getRegionMaximumEditLogSeqNum(byte[] region) {
-      return regionMaximumEditLogSeqNum.get(region);
+      return regionMaximumEditLogSeqNum.get(Bytes.toString(region));
     }
 
     /**
@@ -1328,7 +1354,7 @@ public class WALSplitter {
     }
 
     // delete the one with fewer wal entries
-    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
+    void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
       long dstMinLogSeqNum = -1L;
       try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
         WAL.Entry entry = reader.next();
@@ -1364,7 +1390,7 @@ public class WALSplitter {
      * Close all of the output streams.
      * @return the list of paths written.
      */
-    private List<Path> close() throws IOException {
+    List<Path> close() throws IOException {
       Preconditions.checkState(!closeAndCleanCompleted);
 
       final List<Path> paths = new ArrayList<Path>();
@@ -1381,71 +1407,9 @@ public class WALSplitter {
         });
       CompletionService<Void> completionService =
         new ExecutorCompletionService<Void>(closeThreadPool);
-      for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
-        }
-        completionService.submit(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
-            if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
-            try {
-              wap.w.close();
-            } catch (IOException ioe) {
-              LOG.error("Couldn't close log at " + wap.p, ioe);
-              thrown.add(ioe);
-              return null;
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
-                + " edits, skipped " + wap.editsSkipped + " edits in "
-                + (wap.nanosSpent / 1000 / 1000) + "ms");
-            }
-            if (wap.editsWritten == 0) {
-              // just remove the empty recovered.edits file
-              if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
-                LOG.warn("Failed deleting empty " + wap.p);
-                throw new IOException("Failed deleting empty  " + wap.p);
-              }
-              return null;
-            }
-
-            Path dst = getCompletedRecoveredEditsFilePath(wap.p,
-              regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
-            try {
-              if (!dst.equals(wap.p) && fs.exists(dst)) {
-                deleteOneWithFewerEntries(wap, dst);
-              }
-              // Skip the unit tests which create a splitter that reads and
-              // writes the data without touching disk.
-              // TestHLogSplit#testThreading is an example.
-              if (fs.exists(wap.p)) {
-                if (!fs.rename(wap.p, dst)) {
-                  throw new IOException("Failed renaming " + wap.p + " to " + dst);
-                }
-                LOG.info("Rename " + wap.p + " to " + dst);
-              }
-            } catch (IOException ioe) {
-              LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
-              thrown.add(ioe);
-              return null;
-            }
-            paths.add(dst);
-            return null;
-          }
-        });
-      }
-
-      boolean progress_failed = false;
-      try {
-        for (int i = 0, n = this.writers.size(); i < n; i++) {
-          Future<Void> future = completionService.take();
-          future.get();
-          if (!progress_failed && reporter != null && !reporter.progress()) {
-            progress_failed = true;
-          }
-        }
+      boolean progress_failed;
+      try{
+        progress_failed = executeCloseTask(completionService, thrown, paths);
       } catch (InterruptedException e) {
         IOException iie = new InterruptedIOException();
         iie.initCause(e);
@@ -1467,6 +1431,83 @@ public class WALSplitter {
       return paths;
     }
 
+    boolean executeCloseTask(CompletionService<Void> completionService,
+        final List<IOException> thrown, final List<Path> paths)
+        throws InterruptedException, ExecutionException {
+      for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
+        }
+        completionService.submit(new Callable<Void>() {
+          @Override public Void call() throws Exception {
+            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
+            Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
+            paths.add(dst);
+            return null;
+          }
+        });
+      }
+      boolean progress_failed = false;
+      for (int i = 0, n = this.writers.size(); i < n; i++) {
+        Future<Void> future = completionService.take();
+        future.get();
+        if (!progress_failed && reporter != null && !reporter.progress()) {
+          progress_failed = true;
+        }
+      }
+      return progress_failed;
+    }
+
+    Path closeWriter(String encodedRegionName, WriterAndPath wap, List<IOException> thrown)
+        throws IOException {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Closing " + wap.p);
+      }
+      try {
+        wap.w.close();
+      } catch (IOException ioe) {
+        LOG.error("Couldn't close log at " + wap.p, ioe);
+        thrown.add(ioe);
+        return null;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
+            + " edits, skipped " + wap.editsSkipped + " edits in "
+            + (wap.nanosSpent / 1000 / 1000) + "ms");
+      }
+      if (wap.editsWritten == 0) {
+        // just remove the empty recovered.edits file
+        if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+          LOG.warn("Failed deleting empty " + wap.p);
+          throw new IOException("Failed deleting empty  " + wap.p);
+        }
+        return null;
+      }
+
+      Path dst = getCompletedRecoveredEditsFilePath(wap.p,
+          regionMaximumEditLogSeqNum.get(encodedRegionName));
+      try {
+        if (!dst.equals(wap.p) && fs.exists(dst)) {
+          deleteOneWithFewerEntries(wap, dst);
+        }
+        // Skip the unit tests which create a splitter that reads and
+        // writes the data without touching disk.
+        // TestHLogSplit#testThreading is an example.
+        if (fs.exists(wap.p)) {
+          if (!fs.rename(wap.p, dst)) {
+            throw new IOException("Failed renaming " + wap.p + " to " + dst);
+          }
+          LOG.info("Rename " + wap.p + " to " + dst);
+        }
+      } catch (IOException ioe) {
+        LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+        thrown.add(ioe);
+        return null;
+      }
+      return dst;
+    }
+
+
     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
       if (writersClosed) {
         return thrown;
@@ -1490,20 +1531,19 @@ public class WALSplitter {
           }
         }
       } finally {
-        synchronized (writers) {
-          WriterAndPath wap = null;
-          for (SinkWriter tmpWAP : writers.values()) {
-            try {
-              wap = (WriterAndPath) tmpWAP;
-              wap.w.close();
-            } catch (IOException ioe) {
-              LOG.error("Couldn't close log at " + wap.p, ioe);
-              thrown.add(ioe);
-              continue;
-            }
-            LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
-                + (wap.nanosSpent / 1000 / 1000) + "ms)");
+        WriterAndPath wap = null;
+        for (SinkWriter tmpWAP : writers.values()) {
+          try {
+            wap = (WriterAndPath) tmpWAP;
+            wap.w.close();
+          } catch (IOException ioe) {
+            LOG.error("Couldn't close log at " + wap.p, ioe);
+            thrown.add(ioe);
+            continue;
           }
+          LOG.info(
+              "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
+                  / 1000 / 1000) + "ms)");
         }
         writersClosed = true;
       }
@@ -1516,9 +1556,10 @@ public class WALSplitter {
      * long as multiple threads are always acting on different regions.
      * @return null if this region shouldn't output any logs
      */
-    private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+    WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
       byte region[] = entry.getKey().getEncodedRegionName();
-      WriterAndPath ret = (WriterAndPath) writers.get(region);
+      String regionName = Bytes.toString(region);
+      WriterAndPath ret = (WriterAndPath) writers.get(regionName);
       if (ret != null) {
         return ret;
       }
@@ -1532,14 +1573,17 @@ public class WALSplitter {
         blacklistedRegions.add(region);
         return null;
       }
-      writers.put(region, ret);
+
+      if(reusable) {
+        writers.put(regionName, ret);
+      }
       return ret;
     }
 
     /**
      * @return a path with a write for that path. caller should close.
      */
-    private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
+    WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir,
           fileBeingSplit.getPath().getName());
       if (regionedits == null) {
@@ -1558,7 +1602,7 @@ public class WALSplitter {
       return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum());
     }
 
-    private void filterCellByStore(Entry logEntry) {
+    void filterCellByStore(Entry logEntry) {
       Map<byte[], Long> maxSeqIdInStores =
           regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
       if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
@@ -1589,12 +1633,16 @@ public class WALSplitter {
 
     @Override
     public void append(RegionEntryBuffer buffer) throws IOException {
+      appendBuffer(buffer, true);
+    }
+
+
+    WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
         LOG.warn("got an empty buffer, skipping");
-        return;
+        return null;
       }
-
       WriterAndPath wap = null;
 
       long startTime = System.nanoTime();
@@ -1603,12 +1651,12 @@ public class WALSplitter {
 
         for (Entry logEntry : entries) {
           if (wap == null) {
-            wap = getWriterAndPath(logEntry);
+            wap = getWriterAndPath(logEntry, reusable);
             if (wap == null) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
               }
-              return;
+              return null;
             }
           }
           filterCellByStore(logEntry);
@@ -1628,6 +1676,7 @@ public class WALSplitter {
         LOG.fatal(" Got while writing log entry to log", e);
         throw e;
       }
+      return wap;
     }
 
     @Override
@@ -1648,8 +1697,8 @@ public class WALSplitter {
     public Map<byte[], Long> getOutputCounts() {
       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
       synchronized (writers) {
-        for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
-          ret.put(entry.getKey(), entry.getValue().editsWritten);
+        for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
+          ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
         }
       }
       return ret;
@@ -1662,6 +1711,105 @@ public class WALSplitter {
   }
 
   /**
+   * Class that will limit the number of hdfs writers we create to split the logs
+   */
+  class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink{
+
+    ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
+
+
+    public BoundedLogWriterCreationOutputSink(PipelineController controller,
+        EntryBuffers entryBuffers, int numWriters){
+      super(controller, entryBuffers, numWriters);
+    }
+
+    @Override
+    public List<Path> finishWritingAndClose() throws IOException {
+      boolean isSuccessful;
+      List<Path> result;
+      try {
+        isSuccessful = finishWriting(false);
+      } finally {
+        result = close();
+      }
+      if (isSuccessful) {
+        splits = result;
+      }
+      return splits;
+    }
+
+    @Override
+    boolean executeCloseTask(CompletionService<Void> closeCompletionService,
+        final List<IOException> thrown, final List<Path> paths)
+        throws InterruptedException, ExecutionException {
+      for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
+        LOG.info("Submitting write then close of " +
+            Bytes.toString(buffer.getValue().encodedRegionName));
+        closeCompletionService.submit(new Callable<Void>() {
+          public Void call() throws Exception {
+            Path dst = writeThenClose(buffer.getValue());
+            paths.add(dst);
+            return null;
+          }
+        });
+      }
+
+      boolean progress_failed = false;
+      for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+        Future<Void> future = closeCompletionService.take();
+        future.get();
+        if (!progress_failed && reporter != null && !reporter.progress()) {
+          progress_failed = true;
+        }
+      }
+      return progress_failed;
+    }
+
+    @Override
+    public Map<byte[], Long> getOutputCounts() {
+      Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
+      for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
+        regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
+      }
+      return regionRecoverStatMapResult;
+    }
+
+    @Override
+    public int getNumberOfRecoveredRegions() {
+      return regionRecoverStatMap.size();
+    }
+
+    @Override
+    public void append(RegionEntryBuffer buffer) throws IOException {
+      writeThenClose(buffer);
+    }
+
+    private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
+      WriterAndPath wap = appendBuffer(buffer, false);
+      Path dst = null;
+      if(wap != null){
+        String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
+        Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
+        if(value != null){
+          Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
+          regionRecoverStatMap.put(encodedRegionName, newValue);
+        }
+      }
+
+      List<IOException> thrown = new ArrayList<>();
+      if(wap != null) {
+        dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
+      }
+
+      if(!thrown.isEmpty()){
+        throw MultipleIOException.createIOException(thrown);
+      }
+      return dst;
+    }
+  }
+
+
+  /**
    * Class wraps the actual writer which writes data out and related statistics
    */
   public abstract static class SinkWriter {
@@ -1720,7 +1868,7 @@ public class WALSplitter {
         .synchronizedMap(new TreeMap<TableName, HConnection>());
     /**
      * Map key -> value layout
-     * <servername>:<table name> -> Queue<Row>
+     * servername:table name -> Queue(Row)
      */
     private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
         new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4f46f53/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
new file mode 100644
index 0000000..36bc63a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestWALReplay.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4f46f53/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
new file mode 100644
index 0000000..285e8f3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestWALSplit.setUpBeforeClass();
+    TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
+  }
+
+  /**
+   * The logic of this test is conflict with the limit writers split logic, skip this test
+   */
+  @Test(timeout=300000)
+  @Ignore
+  public void testThreadingSlowWriterSmallBuffer() throws Exception {
+    super.testThreadingSlowWriterSmallBuffer();
+  }
+}
+