You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2018/01/02 20:12:31 UTC
hbase git commit: HBASE-19358 Improve the stability of splitting log
when do fail over
Repository: hbase
Updated Branches:
refs/heads/master 4e9f4abb1 -> f6f57d38f
HBASE-19358 Improve the stability of splitting log when do fail over
Signed-off-by: Yu Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f6f57d38
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f6f57d38
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f6f57d38
Branch: refs/heads/master
Commit: f6f57d38f750c34e2353f605f815c019c8ba3f88
Parents: 4e9f4ab
Author: Jingyun Tian <ti...@gmail.com>
Authored: Tue Jan 2 19:17:34 2018 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Jan 3 04:11:40 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/wal/WALSplitter.java | 400 +++++++++++++------
.../TestWALReplayBoundedLogWriterCreation.java | 35 ++
.../TestWALSplitBoundedLogWriterCreation.java | 44 ++
3 files changed, 358 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6f57d38/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 817e925..328390e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -18,11 +18,6 @@
*/
package org.apache.hadoop.hbase.wal;
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -30,6 +25,7 @@ import java.io.InterruptedIOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -68,6 +64,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.SplitLogManager;
@@ -77,12 +74,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -99,6 +90,16 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
/**
* This class is responsible for splitting up a bunch of regionserver commit log
* files that are no longer being written to, into new files, one per region, for
@@ -138,6 +139,12 @@ public class WALSplitter {
// the file being split currently
private FileStatus fileBeingSplit;
+ // if we limit the number of writers opened for sinking recovered edits
+ private final boolean splitWriterCreationBounded;
+
+ public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
+
+
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker,
@@ -154,11 +161,19 @@ public class WALSplitter {
this.walFactory = factory;
PipelineController controller = new PipelineController();
+ this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+
entryBuffers = new EntryBuffers(controller,
- this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024));
+ this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
+ splitWriterCreationBounded);
int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
- outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+ if(splitWriterCreationBounded){
+ outputSink = new BoundedLogWriterCreationOutputSink(
+ controller, entryBuffers, numWriterThreads);
+ }else {
+ outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+ }
}
/**
@@ -840,10 +855,17 @@ public class WALSplitter {
long totalBuffered = 0;
long maxHeapUsage;
+ boolean splitWriterCreationBounded;
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+ this(controller, maxHeapUsage, false);
+ }
+
+ public EntryBuffers(PipelineController controller, long maxHeapUsage,
+ boolean splitWriterCreationBounded){
this.controller = controller;
this.maxHeapUsage = maxHeapUsage;
+ this.splitWriterCreationBounded = splitWriterCreationBounded;
}
/**
@@ -884,6 +906,13 @@ public class WALSplitter {
* @return RegionEntryBuffer a buffer of edits to be written.
*/
synchronized RegionEntryBuffer getChunkToWrite() {
+ // The core part of limiting opening writers is it doesn't return chunk only if the
+ // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
+ // region during splitting. It will flush all the logs in the buffer after splitting
+ // through a threadpool, which means the number of writers it created is under control.
+ if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
+ return null;
+ }
long biggestSize = 0;
byte[] biggestBufferKey = null;
@@ -1062,11 +1091,10 @@ public class WALSplitter {
protected PipelineController controller;
protected EntryBuffers entryBuffers;
- protected Map<byte[], SinkWriter> writers = Collections
- .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
+ protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
+ protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
+ new ConcurrentHashMap<>();
- protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
- .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
protected final List<WriterThread> writerThreads = Lists.newArrayList();
@@ -1113,11 +1141,10 @@ public class WALSplitter {
*/
void updateRegionMaximumEditLogSeqNum(Entry entry) {
synchronized (regionMaximumEditLogSeqNum) {
- Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
- .getEncodedRegionName());
+ String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+ Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
- regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
- .getSequenceId());
+ regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
}
}
}
@@ -1277,87 +1304,24 @@ public class WALSplitter {
* Close all of the output streams.
* @return the list of paths written.
*/
- private List<Path> close() throws IOException {
+ List<Path> close() throws IOException {
Preconditions.checkState(!closeAndCleanCompleted);
final List<Path> paths = new ArrayList<>();
final List<IOException> thrown = Lists.newArrayList();
- ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
- TimeUnit.SECONDS, new ThreadFactory() {
- private int count = 1;
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "split-log-closeStream-" + count++);
- return t;
- }
- });
- CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
- for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
- }
- completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
- if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
- try {
- wap.w.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
- thrown.add(ioe);
- return null;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
- + " edits, skipped " + wap.editsSkipped + " edits in "
- + (wap.nanosSpent / 1000 / 1000) + "ms");
- }
- if (wap.editsWritten == 0) {
- // just remove the empty recovered.edits file
- if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
- LOG.warn("Failed deleting empty " + wap.p);
- throw new IOException("Failed deleting empty " + wap.p);
- }
- return null;
- }
+ ThreadPoolExecutor closeThreadPool = Threads
+ .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
+ private int count = 1;
- Path dst = getCompletedRecoveredEditsFilePath(wap.p,
- regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
- try {
- if (!dst.equals(wap.p) && fs.exists(dst)) {
- deleteOneWithFewerEntries(wap, dst);
- }
- // Skip the unit tests which create a splitter that reads and
- // writes the data without touching disk.
- // TestHLogSplit#testThreading is an example.
- if (fs.exists(wap.p)) {
- if (!fs.rename(wap.p, dst)) {
- throw new IOException("Failed renaming " + wap.p + " to " + dst);
- }
- LOG.info("Rename " + wap.p + " to " + dst);
- }
- } catch (IOException ioe) {
- LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
- thrown.add(ioe);
- return null;
+ @Override public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "split-log-closeStream-" + count++);
+ return t;
}
- paths.add(dst);
- return null;
- }
- });
- }
-
- boolean progress_failed = false;
+ });
+ CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
+ boolean progress_failed;
try {
- for (int i = 0, n = this.writers.size(); i < n; i++) {
- Future<Void> future = completionService.take();
- future.get();
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- }
+ progress_failed = executeCloseTask(completionService, thrown, paths);
} catch (InterruptedException e) {
IOException iie = new InterruptedIOException();
iie.initCause(e);
@@ -1367,7 +1331,6 @@ public class WALSplitter {
} finally {
closeThreadPool.shutdownNow();
}
-
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
@@ -1379,6 +1342,88 @@ public class WALSplitter {
return paths;
}
+ /**
+ * @param completionService threadPool to execute the closing tasks
+ * @param thrown store the exceptions
+ * @param paths arrayList to store the paths written
+ * @return if close tasks executed successful
+ */
+ boolean executeCloseTask(CompletionService<Void> completionService,
+ List<IOException> thrown, List<Path> paths)
+ throws InterruptedException, ExecutionException {
+ for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
+ }
+ completionService.submit(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
+ Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
+ paths.add(dst);
+ return null;
+ }
+ });
+ }
+ boolean progress_failed = false;
+ for (int i = 0, n = this.writers.size(); i < n; i++) {
+ Future<Void> future = completionService.take();
+ future.get();
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
+ }
+ }
+ return progress_failed;
+ }
+
+ Path closeWriter(String encodedRegionName, WriterAndPath wap,
+ List<IOException> thrown) throws IOException{
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing " + wap.p);
+ }
+ try {
+ wap.w.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close log at " + wap.p, ioe);
+ thrown.add(ioe);
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
+ + " edits, skipped " + wap.editsSkipped + " edits in "
+ + (wap.nanosSpent / 1000 / 1000) + "ms");
+ }
+ if (wap.editsWritten == 0) {
+ // just remove the empty recovered.edits file
+ if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+ LOG.warn("Failed deleting empty " + wap.p);
+ throw new IOException("Failed deleting empty " + wap.p);
+ }
+ return null;
+ }
+
+ Path dst = getCompletedRecoveredEditsFilePath(wap.p,
+ regionMaximumEditLogSeqNum.get(encodedRegionName));
+ try {
+ if (!dst.equals(wap.p) && fs.exists(dst)) {
+ deleteOneWithFewerEntries(wap, dst);
+ }
+ // Skip the unit tests which create a splitter that reads and
+ // writes the data without touching disk.
+ // TestHLogSplit#testThreading is an example.
+ if (fs.exists(wap.p)) {
+ if (!fs.rename(wap.p, dst)) {
+ throw new IOException("Failed renaming " + wap.p + " to " + dst);
+ }
+ LOG.info("Rename " + wap.p + " to " + dst);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+ thrown.add(ioe);
+ return null;
+ }
+ return dst;
+ }
+
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
if (writersClosed) {
return thrown;
@@ -1402,20 +1447,19 @@ public class WALSplitter {
}
}
} finally {
- synchronized (writers) {
- WriterAndPath wap = null;
- for (SinkWriter tmpWAP : writers.values()) {
- try {
- wap = (WriterAndPath) tmpWAP;
- wap.w.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
- thrown.add(ioe);
- continue;
- }
- LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
- + (wap.nanosSpent / 1000 / 1000) + "ms)");
+ WriterAndPath wap = null;
+ for (SinkWriter tmpWAP : writers.values()) {
+ try {
+ wap = (WriterAndPath) tmpWAP;
+ wap.w.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close log at " + wap.p, ioe);
+ thrown.add(ioe);
+ continue;
}
+ LOG.info(
+ "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
+ / 1000 / 1000) + "ms)");
}
writersClosed = true;
}
@@ -1428,9 +1472,10 @@ public class WALSplitter {
* long as multiple threads are always acting on different regions.
* @return null if this region shouldn't output any logs
*/
- private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+ WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
byte region[] = entry.getKey().getEncodedRegionName();
- WriterAndPath ret = (WriterAndPath) writers.get(region);
+ String regionName = Bytes.toString(region);
+ WriterAndPath ret = (WriterAndPath) writers.get(regionName);
if (ret != null) {
return ret;
}
@@ -1444,14 +1489,16 @@ public class WALSplitter {
blacklistedRegions.add(region);
return null;
}
- writers.put(region, ret);
+ if(reusable) {
+ writers.put(regionName, ret);
+ }
return ret;
}
/**
* @return a path with a write for that path. caller should close.
*/
- private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
+ WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
if (regionedits == null) {
return null;
@@ -1469,7 +1516,7 @@ public class WALSplitter {
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
}
- private void filterCellByStore(Entry logEntry) {
+ void filterCellByStore(Entry logEntry) {
Map<byte[], Long> maxSeqIdInStores =
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
@@ -1500,10 +1547,14 @@ public class WALSplitter {
@Override
public void append(RegionEntryBuffer buffer) throws IOException {
+ appendBuffer(buffer, true);
+ }
+
+ WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping");
- return;
+ return null;
}
WriterAndPath wap = null;
@@ -1514,14 +1565,14 @@ public class WALSplitter {
for (Entry logEntry : entries) {
if (wap == null) {
- wap = getWriterAndPath(logEntry);
+ wap = getWriterAndPath(logEntry, reusable);
if (wap == null) {
if (LOG.isTraceEnabled()) {
// This log spews the full edit. Can be massive in the log. Enable only debugging
// WAL lost edit issues.
LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
}
- return;
+ return null;
}
}
filterCellByStore(logEntry);
@@ -1542,6 +1593,7 @@ public class WALSplitter {
LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e);
throw e;
}
+ return wap;
}
@Override
@@ -1561,10 +1613,8 @@ public class WALSplitter {
@Override
public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- synchronized (writers) {
- for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().editsWritten);
- }
+ for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
+ ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
}
return ret;
}
@@ -1576,6 +1626,114 @@ public class WALSplitter {
}
/**
+ *
+ */
+ class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
+
+ private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
+
+ public BoundedLogWriterCreationOutputSink(PipelineController controller,
+ EntryBuffers entryBuffers, int numWriters) {
+ super(controller, entryBuffers, numWriters);
+ }
+
+ @Override
+ public List<Path> finishWritingAndClose() throws IOException {
+ boolean isSuccessful;
+ List<Path> result;
+ try {
+ isSuccessful = finishWriting(false);
+ } finally {
+ result = close();
+ }
+ if (isSuccessful) {
+ splits = result;
+ }
+ return splits;
+ }
+
+ @Override
+ boolean executeCloseTask(CompletionService<Void> completionService,
+ List<IOException> thrown, List<Path> paths)
+ throws InterruptedException, ExecutionException {
+ for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
+ LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName);
+ completionService.submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ Path dst = writeThenClose(buffer.getValue());
+ paths.add(dst);
+ return null;
+ }
+ });
+ }
+ boolean progress_failed = false;
+ for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+ Future<Void> future = completionService.take();
+ future.get();
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
+ }
+ }
+
+ return progress_failed;
+ }
+
+ /**
+ * since the splitting process may create multiple output files, we need a map
+ * regionRecoverStatMap to track the output count of each region.
+ * @return a map from encoded region ID to the number of edits written out for that region.
+ */
+ @Override
+ public Map<byte[], Long> getOutputCounts() {
+ Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
+ for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
+ regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
+ }
+ return regionRecoverStatMapResult;
+ }
+
+ /**
+ * @return the number of recovered regions
+ */
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return regionRecoverStatMap.size();
+ }
+
+ /**
+ * Append the buffer to a new recovered edits file, then close it after all done
+ * @param buffer contain all entries of a certain region
+ * @throws IOException when closeWriter failed
+ */
+ @Override
+ public void append(RegionEntryBuffer buffer) throws IOException {
+ writeThenClose(buffer);
+ }
+
+ private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
+ WriterAndPath wap = appendBuffer(buffer, false);
+ if(wap != null) {
+ String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
+ Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
+ if (value != null) {
+ Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
+ regionRecoverStatMap.put(encodedRegionName, newValue);
+ }
+ }
+
+ Path dst = null;
+ List<IOException> thrown = new ArrayList<>();
+ if(wap != null){
+ dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
+ }
+ if (!thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ return dst;
+ }
+ }
+
+ /**
* Class wraps the actual writer which writes data out and related statistics
*/
public abstract static class SinkWriter {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6f57d38/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
new file mode 100644
index 0000000..55bbeaf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+
+@Category(MediumTests.class)
+public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestWALReplay.setUpBeforeClass();
+ TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6f57d38/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
new file mode 100644
index 0000000..844cb3a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestWALSplit.setUpBeforeClass();
+ TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
+ }
+
+ /**
+ * The logic of this test has conflict with the limit writers split logic, skip this test
+ */
+ @Test(timeout=300000)
+ @Ignore
+ public void testThreadingSlowWriterSmallBuffer() throws Exception {
+ super.testThreadingSlowWriterSmallBuffer();
+ }
+}
+