You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:50 UTC

[11/21] hbase git commit: HBASE-12522 Backport of write-ahead-log refactoring and follow-ons.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
new file mode 100644
index 0000000..2d6b589
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -0,0 +1,2171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagRewriteCell;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.master.SplitLogManager;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.LastSequenceId;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.io.MultipleIOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+
+/**
+ * This class is responsible for splitting up a bunch of regionserver commit log
+ * files that are no longer being written to, into new files, one per region for
+ * region to replay on startup. Delete the old log files when finished.
+ */
+@InterfaceAudience.Private
+public class WALSplitter {
+  static final Log LOG = LogFactory.getLog(WALSplitter.class);
+
+  /** By default we retry errors in splitting, rather than skipping. */
+  public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
+
+  // Parameters for split process
+  protected final Path rootDir;
+  protected final FileSystem fs;
+  protected final Configuration conf;
+
+  // Major subcomponents of the split process.
+  // These are separated into inner classes to make testing easier.
+  OutputSink outputSink;
+  EntryBuffers entryBuffers;
+
+  private Set<TableName> disablingOrDisabledTables =
+      new HashSet<TableName>();
+  private BaseCoordinatedStateManager csm;
+  private final WALFactory walFactory;
+
+  // If an exception is thrown by one of the other threads, it will be
+  // stored here.
+  protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+  // Wait/notify for when data has been produced by the reader thread,
+  // consumed by the reader thread, or an exception occurred
+  final Object dataAvailable = new Object();
+
+  private MonitoredTask status;
+
+  // For checking the latest flushed sequence id
+  protected final LastSequenceId sequenceIdChecker;
+
+  protected boolean distributedLogReplay;
+
+  // Map encodedRegionName -> lastFlushedSequenceId
+  protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
+
+  // Map encodedRegionName -> maxSeqIdInStores
+  protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
+      new ConcurrentHashMap<String, Map<byte[], Long>>();
+
+  // Failed region server that the wal file being split belongs to
+  protected String failedServerName = "";
+
+  // Number of writer threads
+  private final int numWriterThreads;
+
+  // Min batch size when replay WAL edits
+  private final int minBatchSize;
+
+  WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
+      FileSystem fs, LastSequenceId idChecker,
+      CoordinatedStateManager csm, RecoveryMode mode) {
+    this.conf = HBaseConfiguration.create(conf);
+    String codecClassName = conf
+        .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+    this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
+    this.rootDir = rootDir;
+    this.fs = fs;
+    this.sequenceIdChecker = idChecker;
+    this.csm = (BaseCoordinatedStateManager)csm;
+    this.walFactory = factory;
+
+    entryBuffers = new EntryBuffers(
+        this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
+            128*1024*1024));
+
+    // a larger minBatchSize may slow down recovery because replay writer has to wait for
+    // enough edits before replaying them
+    this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
+    this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
+
+    this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+    if (csm != null && this.distributedLogReplay) {
+      outputSink = new LogReplayOutputSink(numWriterThreads);
+    } else {
+      if (this.distributedLogReplay) {
+        LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
+      }
+      this.distributedLogReplay = false;
+      outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
+    }
+
+  }
+
+  /**
+   * Splits a WAL file into region's recovered-edits directory.
+   * This is the main entry point for distributed log splitting from SplitLogWorker.
+   * <p>
+   * If the log file has N regions then N recovered.edits files will be produced.
+   * <p>
+   * @param rootDir
+   * @param logfile
+   * @param fs
+   * @param conf
+   * @param reporter
+   * @param idChecker
+   * @param cp coordination state manager
+   * @return false if it is interrupted by the progress-able.
+   * @throws IOException
+   */
+  public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
+      CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
+    WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
+    return s.splitLogFile(logfile, reporter);
+  }
+
+  // A wrapper to split one log folder using the method used by distributed
+  // log splitting. Used by tools and unit tests. It should be package private.
+  // It is public only because UpgradeTo96 and TestWALObserver are in different packages,
+  // which uses this method to do log splitting.
+  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
+      FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
+    final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
+        Collections.singletonList(logDir), null);
+    List<Path> splits = new ArrayList<Path>();
+    if (logfiles != null && logfiles.length > 0) {
+      for (FileStatus logfile: logfiles) {
+        WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
+            RecoveryMode.LOG_SPLITTING);
+        if (s.splitLogFile(logfile, null)) {
+          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
+          if (s.outputSink.splits != null) {
+            splits.addAll(s.outputSink.splits);
+          }
+        }
+      }
+    }
+    if (!fs.delete(logDir, true)) {
+      throw new IOException("Unable to delete src dir: " + logDir);
+    }
+    return splits;
+  }
+
+  /**
+   * log splitting implementation, splits one log file.
+   * @param logfile should be an actual log file.
+   */
+  boolean splitLogFile(FileStatus logfile,
+      CancelableProgressable reporter) throws IOException {
+    Preconditions.checkState(status == null);
+    Preconditions.checkArgument(logfile.isFile(),
+        "passed in file status is for something other than a regular file.");
+    boolean isCorrupted = false;
+    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
+      SPLIT_SKIP_ERRORS_DEFAULT);
+    int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
+    Path logPath = logfile.getPath();
+    boolean outputSinkStarted = false;
+    boolean progress_failed = false;
+    int editsCount = 0;
+    int editsSkipped = 0;
+
+    status =
+        TaskMonitor.get().createStatus(
+          "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+    Reader in = null;
+    try {
+      long logLength = logfile.getLen();
+      LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
+      LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
+      status.setStatus("Opening log file");
+      if (reporter != null && !reporter.progress()) {
+        progress_failed = true;
+        return false;
+      }
+      try {
+        in = getReader(logfile, skipErrors, reporter);
+      } catch (CorruptedLogFileException e) {
+        LOG.warn("Could not get reader, corrupted log file " + logPath, e);
+        ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
+        isCorrupted = true;
+      }
+      if (in == null) {
+        LOG.warn("Nothing to split in log file " + logPath);
+        return true;
+      }
+      if (csm != null) {
+        try {
+          TableStateManager tsm = csm.getTableStateManager();
+          disablingOrDisabledTables = tsm.getTablesInStates(
+          ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
+        } catch (CoordinatedStateException e) {
+          throw new IOException("Can't get disabling/disabled tables", e);
+        }
+      }
+      int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
+      int numOpenedFilesLastCheck = 0;
+      outputSink.setReporter(reporter);
+      outputSink.startWriterThreads();
+      outputSinkStarted = true;
+      Entry entry;
+      Long lastFlushedSequenceId = -1L;
+      ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
+      failedServerName = (serverName == null) ? "" : serverName.getServerName();
+      while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
+        byte[] region = entry.getKey().getEncodedRegionName();
+        String key = Bytes.toString(region);
+        lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
+        if (lastFlushedSequenceId == null) {
+          if (this.distributedLogReplay) {
+            RegionStoreSequenceIds ids =
+                csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
+                  key);
+            if (ids != null) {
+              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
+            }
+          } else if (sequenceIdChecker != null) {
+            lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
+          }
+          if (lastFlushedSequenceId == null) {
+            lastFlushedSequenceId = -1L;
+          }
+          lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
+        }
+        if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+          editsSkipped++;
+          continue;
+        }
+        entryBuffers.appendEntry(entry);
+        editsCount++;
+        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
+        // If sufficient edits have passed, check if we should report progress.
+        if (editsCount % interval == 0
+            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
+          numOpenedFilesLastCheck = this.getNumOpenWriters();
+          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
+              + " edits, skipped " + editsSkipped + " edits.";
+          status.setStatus("Split " + countsStr);
+          if (reporter != null && !reporter.progress()) {
+            progress_failed = true;
+            return false;
+          }
+        }
+      }
+    } catch (InterruptedException ie) {
+      IOException iie = new InterruptedIOException();
+      iie.initCause(ie);
+      throw iie;
+    } catch (CorruptedLogFileException e) {
+      LOG.warn("Could not parse, corrupted log file " + logPath, e);
+      csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
+        logfile.getPath().getName(), fs);
+      isCorrupted = true;
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      throw e;
+    } finally {
+      LOG.debug("Finishing writing output logs and closing down.");
+      try {
+        if (null != in) {
+          in.close();
+        }
+      } catch (IOException exception) {
+        LOG.warn("Could not close wal reader: " + exception.getMessage());
+        LOG.debug("exception details", exception);
+      }
+      try {
+        if (outputSinkStarted) {
+          // Set progress_failed to true as the immediate following statement will reset its value
+          // when finishWritingAndClose() throws exception, progress_failed has the right value
+          progress_failed = true;
+          progress_failed = outputSink.finishWritingAndClose() == null;
+        }
+      } finally {
+        String msg =
+            "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
+                + " regions; log file=" + logPath + " is corrupted = " + isCorrupted
+                + " progress failed = " + progress_failed;
+        LOG.info(msg);
+        status.markComplete(msg);
+      }
+    }
+    return !progress_failed;
+  }
+
+  /**
+   * Completes the work done by splitLogFile by archiving logs
+   * <p>
+   * It is invoked by SplitLogManager once it knows that one of the
+   * SplitLogWorkers have completed the splitLogFile() part. If the master
+   * crashes then this function might get called multiple times.
+   * <p>
+   * @param logfile
+   * @param conf
+   * @throws IOException
+   */
+  public static void finishSplitLogFile(String logfile,
+      Configuration conf)  throws IOException {
+    Path rootdir = FSUtils.getRootDir(conf);
+    Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path logPath;
+    if (FSUtils.isStartingWithPath(rootdir, logfile)) {
+      logPath = new Path(logfile);
+    } else {
+      logPath = new Path(rootdir, logfile);
+    }
+    finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
+  }
+
+  static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+      Path logPath, Configuration conf) throws IOException {
+    List<Path> processedLogs = new ArrayList<Path>();
+    List<Path> corruptedLogs = new ArrayList<Path>();
+    FileSystem fs;
+    fs = rootdir.getFileSystem(conf);
+    if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
+      corruptedLogs.add(logPath);
+    } else {
+      processedLogs.add(logPath);
+    }
+    archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+    Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
+    fs.delete(stagingDir, true);
+  }
+
+  /**
+   * Moves processed logs to a oldLogDir after successful processing Moves
+   * corrupted logs (any log that couldn't be successfully parsed to corruptDir
+   * (.corrupt) for later investigation
+   *
+   * @param corruptedLogs
+   * @param processedLogs
+   * @param oldLogDir
+   * @param fs
+   * @param conf
+   * @throws IOException
+   */
+  private static void archiveLogs(
+      final List<Path> corruptedLogs,
+      final List<Path> processedLogs, final Path oldLogDir,
+      final FileSystem fs, final Configuration conf) throws IOException {
+    final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
+        "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
+
+    if (!fs.mkdirs(corruptDir)) {
+      LOG.info("Unable to mkdir " + corruptDir);
+    }
+    fs.mkdirs(oldLogDir);
+
+    // this method can get restarted or called multiple times for archiving
+    // the same log files.
+    for (Path corrupted : corruptedLogs) {
+      Path p = new Path(corruptDir, corrupted.getName());
+      if (fs.exists(corrupted)) {
+        if (!fs.rename(corrupted, p)) {
+          LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
+        } else {
+          LOG.warn("Moved corrupted log " + corrupted + " to " + p);
+        }
+      }
+    }
+
+    for (Path p : processedLogs) {
+      Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
+      if (fs.exists(p)) {
+        if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
+          LOG.warn("Unable to move  " + p + " to " + newPath);
+        } else {
+          LOG.info("Archived processed log " + p + " to " + newPath);
+        }
+      }
+    }
+  }
+
+  /**
+   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
+   * <code>logEntry</code> named for the sequenceid in the passed
+   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
+   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
+   * creating it if necessary.
+   * @param fs
+   * @param logEntry
+   * @param rootDir HBase root dir.
+   * @return Path to file into which to dump split log edits.
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  static Path getRegionSplitEditsPath(final FileSystem fs,
+      final Entry logEntry, final Path rootDir, boolean isCreate)
+  throws IOException {
+    Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
+    String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
+    Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
+    Path dir = getRegionDirRecoveredEditsDir(regiondir);
+
+    if (!fs.exists(regiondir)) {
+      LOG.info("This region's directory doesn't exist: "
+          + regiondir.toString() + ". It is very likely that it was" +
+          " already split so it's safe to discard those edits.");
+      return null;
+    }
+    if (fs.exists(dir) && fs.isFile(dir)) {
+      Path tmp = new Path("/tmp");
+      if (!fs.exists(tmp)) {
+        fs.mkdirs(tmp);
+      }
+      tmp = new Path(tmp,
+        HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
+      LOG.warn("Found existing old file: " + dir + ". It could be some "
+        + "leftover of an old installation. It should be a folder instead. "
+        + "So moving it to " + tmp);
+      if (!fs.rename(dir, tmp)) {
+        LOG.warn("Failed to sideline old file " + dir);
+      }
+    }
+
+    if (isCreate && !fs.exists(dir)) {
+      if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+    }
+    // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
+    // region's replayRecoveredEdits will not delete it
+    String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
+    fileName = getTmpRecoveredEditsFileName(fileName);
+    return new Path(dir, fileName);
+  }
+
+  static String getTmpRecoveredEditsFileName(String fileName) {
+    return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
+  }
+
+  /**
+   * Get the completed recovered edits file path, renaming it to be by last edit
+   * in the file from its first edit. Then we could use the name to skip
+   * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
+   * @param srcPath
+   * @param maximumEditLogSeqNum
+   * @return dstPath take file's last edit log seq num as the name
+   */
+  static Path getCompletedRecoveredEditsFilePath(Path srcPath,
+      Long maximumEditLogSeqNum) {
+    String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
+    return new Path(srcPath.getParent(), fileName);
+  }
+
+  static String formatRecoveredEditsFileName(final long seqid) {
+    return String.format("%019d", seqid);
+  }
+
+  private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
+  private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
+
+  /**
+   * @param regiondir
+   *          This regions directory in the filesystem.
+   * @return The directory that holds recovered edits files for the region
+   *         <code>regiondir</code>
+   */
+  public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
+    return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
+  }
+
+  /**
+   * Returns sorted set of edit files made by splitter, excluding files
+   * with '.temp' suffix.
+   *
+   * @param fs
+   * @param regiondir
+   * @return Files in passed <code>regiondir</code> as a sorted set.
+   * @throws IOException
+   */
+  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
+      final Path regiondir) throws IOException {
+    NavigableSet<Path> filesSorted = new TreeSet<Path>();
+    Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
+    if (!fs.exists(editsdir))
+      return filesSorted;
+    FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        boolean result = false;
+        try {
+          // Return files and only files that match the editfile names pattern.
+          // There can be other files in this directory other than edit files.
+          // In particular, on error, we'll move aside the bad edit file giving
+          // it a timestamp suffix. See moveAsideBadEditsFile.
+          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+          result = fs.isFile(p) && m.matches();
+          // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
+          // because it means splitwal thread is writting this file.
+          if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
+            result = false;
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed isFile check on " + p);
+        }
+        return result;
+      }
+    });
+    if (files == null) {
+      return filesSorted;
+    }
+    for (FileStatus status : files) {
+      filesSorted.add(status.getPath());
+    }
+    return filesSorted;
+  }
+
+  /**
+   * Move aside a bad edits file.
+   *
+   * @param fs
+   * @param edits
+   *          Edits file to move aside.
+   * @return The name of the moved aside file.
+   * @throws IOException
+   */
+  public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
+      throws IOException {
+    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+        + System.currentTimeMillis());
+    if (!fs.rename(edits, moveAsideName)) {
+      LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
+    }
+    return moveAsideName;
+  }
+
+  private static final String SEQUENCE_ID_FILE_SUFFIX = "_seqid";
+
+  /**
+   * Is the given file a region open sequence id file.
+   */
+  @VisibleForTesting
+  public static boolean isSequenceIdFile(final Path file) {
+    return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX);
+  }
+
+  /**
+   * Create a file with name as region open sequence id
+   * 
+   * @param fs
+   * @param regiondir
+   * @param newSeqId
+   * @param saftyBumper
+   * @return long new sequence Id value
+   * @throws IOException
+   */
+  public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir,
+      long newSeqId, long saftyBumper) throws IOException {
+
+    Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
+    long maxSeqId = 0;
+    FileStatus[] files = null;
+    if (fs.exists(editsdir)) {
+      files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
+        @Override
+        public boolean accept(Path p) {
+          return isSequenceIdFile(p);
+        }
+      });
+      if (files != null) {
+        for (FileStatus status : files) {
+          String fileName = status.getPath().getName();
+          try {
+            Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
+                    - SEQUENCE_ID_FILE_SUFFIX.length()));
+            maxSeqId = Math.max(tmpSeqId, maxSeqId);
+          } catch (NumberFormatException ex) {
+            LOG.warn("Invalid SeqId File Name=" + fileName);
+          }
+        }
+      }
+    }
+    if (maxSeqId > newSeqId) {
+      newSeqId = maxSeqId;
+    }
+    newSeqId += saftyBumper; // bump up SeqId
+    
+    // write a new seqId file
+    Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
+    if (!fs.createNewFile(newSeqIdFile)) {
+      throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
+    }
+    // remove old ones
+    if(files != null) {
+      for (FileStatus status : files) {
+        fs.delete(status.getPath(), false);
+      }
+    }
+    return newSeqId;
+  }
+
+  /**
+   * Create a new {@link Reader} for reading logs to split.
+   *
+   * @param file
+   * @return A new Reader instance, caller should close
+   * @throws IOException
+   * @throws CorruptedLogFileException
+   */
+  protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
+      throws IOException, CorruptedLogFileException {
+    Path path = file.getPath();
+    long length = file.getLen();
+    Reader in;
+
+    // Check for possibly empty file. With appends, currently Hadoop reports a
+    // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+    // HDFS-878 is committed.
+    if (length <= 0) {
+      LOG.warn("File " + path + " might be still open, length is 0");
+    }
+
+    try {
+      FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
+      try {
+        in = getReader(path, reporter);
+      } catch (EOFException e) {
+        if (length <= 0) {
+          // TODO should we ignore an empty, not-last log file if skip.errors
+          // is false? Either way, the caller should decide what to do. E.g.
+          // ignore if this is the last log in sequence.
+          // TODO is this scenario still possible if the log has been
+          // recovered (i.e. closed)
+          LOG.warn("Could not open " + path + " for reading. File is empty", e);
+          return null;
+        } else {
+          // EOFException being ignored
+          return null;
+        }
+      }
+    } catch (IOException e) {
+      if (e instanceof FileNotFoundException) {
+        // A wal file may not exist anymore. Nothing can be recovered so move on
+        LOG.warn("File " + path + " doesn't exist anymore.", e);
+        return null;
+      }
+      if (!skipErrors || e instanceof InterruptedIOException) {
+        throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
+      }
+      CorruptedLogFileException t =
+        new CorruptedLogFileException("skipErrors=true Could not open wal " +
+            path + " ignoring");
+      t.initCause(e);
+      throw t;
+    }
+    return in;
+  }
+
+  static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+  throws CorruptedLogFileException, IOException {
+    try {
+      return in.next();
+    } catch (EOFException eof) {
+      // truncated files are expected if a RS crashes (see HBASE-2643)
+      LOG.info("EOF from wal " + path + ".  continuing");
+      return null;
+    } catch (IOException e) {
+      // If the IOE resulted from bad file format,
+      // then this problem is idempotent and retrying won't help
+      if (e.getCause() != null &&
+          (e.getCause() instanceof ParseException ||
+           e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
+        LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
+           + path + ".  continuing");
+        return null;
+      }
+      if (!skipErrors) {
+        throw e;
+      }
+      CorruptedLogFileException t =
+        new CorruptedLogFileException("skipErrors=true Ignoring exception" +
+            " while parsing wal " + path + ". Marking as corrupted");
+      t.initCause(e);
+      throw t;
+    }
+  }
+
+  private void writerThreadError(Throwable t) {
+    thrown.compareAndSet(null, t);
+  }
+
+  /**
+   * Check for errors in the writer threads. If any is found, rethrow it.
+   */
+  private void checkForErrors() throws IOException {
+    Throwable thrown = this.thrown.get();
+    if (thrown == null) return;
+    if (thrown instanceof IOException) {
+      throw new IOException(thrown);
+    } else {
+      throw new RuntimeException(thrown);
+    }
+  }
+  /**
+   * Create a new {@link Writer} for writing log splits.
+   * @return a new Writer instance, caller should close
+   */
+  protected Writer createWriter(Path logfile)
+      throws IOException {
+    return walFactory.createRecoveredEditsWriter(fs, logfile);
+  }
+
+  /**
+   * Create a new {@link Reader} for reading logs to split.
+   * @return new Reader instance, caller should close
+   */
+  protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
+    return walFactory.createReader(fs, curLogFile, reporter);
+  }
+
+  /**
+   * Get current open writers
+   */
+  private int getNumOpenWriters() {
+    int result = 0;
+    if (this.outputSink != null) {
+      result += this.outputSink.getNumOpenWriters();
+    }
+    return result;
+  }
+
+  /**
+   * Class which accumulates edits and separates them into a buffer per region
+   * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
+   * a predefined threshold.
+   *
+   * Writer threads then pull region-specific buffers from this class.
+   */
+  class EntryBuffers {
+    Map<byte[], RegionEntryBuffer> buffers =
+      new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
+
+    /* Track which regions are currently in the middle of writing. We don't allow
+       an IO thread to pick up bytes from a region if we're already writing
+       data for that region in a different IO thread. */
+    Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+    long totalBuffered = 0;
+    long maxHeapUsage;
+
+    EntryBuffers(long maxHeapUsage) {
+      this.maxHeapUsage = maxHeapUsage;
+    }
+
+    /**
+     * Append a log entry into the corresponding region buffer.
+     * Blocks if the total heap usage has crossed the specified threshold.
+     *
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    void appendEntry(Entry entry) throws InterruptedException, IOException {
+      WALKey key = entry.getKey();
+
+      RegionEntryBuffer buffer;
+      long incrHeap;
+      synchronized (this) {
+        buffer = buffers.get(key.getEncodedRegionName());
+        if (buffer == null) {
+          buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
+          buffers.put(key.getEncodedRegionName(), buffer);
+        }
+        incrHeap= buffer.appendEntry(entry);
+      }
+
+      // If we crossed the chunk threshold, wait for more space to be available
+      synchronized (dataAvailable) {
+        totalBuffered += incrHeap;
+        while (totalBuffered > maxHeapUsage && thrown.get() == null) {
+          LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
+          dataAvailable.wait(2000);
+        }
+        dataAvailable.notifyAll();
+      }
+      checkForErrors();
+    }
+
+    /**
+     * @return RegionEntryBuffer a buffer of edits to be written or replayed.
+     */
+    synchronized RegionEntryBuffer getChunkToWrite() {
+      long biggestSize = 0;
+      byte[] biggestBufferKey = null;
+
+      for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
+        long size = entry.getValue().heapSize();
+        if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
+          biggestSize = size;
+          biggestBufferKey = entry.getKey();
+        }
+      }
+      if (biggestBufferKey == null) {
+        return null;
+      }
+
+      RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
+      currentlyWriting.add(biggestBufferKey);
+      return buffer;
+    }
+
+    void doneWriting(RegionEntryBuffer buffer) {
+      synchronized (this) {
+        boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
+        assert removed;
+      }
+      long size = buffer.heapSize();
+
+      synchronized (dataAvailable) {
+        totalBuffered -= size;
+        // We may unblock writers
+        dataAvailable.notifyAll();
+      }
+    }
+
+    synchronized boolean isRegionCurrentlyWriting(byte[] region) {
+      return currentlyWriting.contains(region);
+    }
+  }
+
+  /**
+   * A buffer of some number of edits for a given region.
+   * This accumulates edits and also provides a memory optimization in order to
+   * share a single byte array instance for the table and region name.
+   * Also tracks memory usage of the accumulated edits.
+   */
+  static class RegionEntryBuffer implements HeapSize {
+    long heapInBuffer = 0;
+    List<Entry> entryBuffer;
+    TableName tableName;
+    byte[] encodedRegionName;
+
+    RegionEntryBuffer(TableName tableName, byte[] region) {
+      this.tableName = tableName;
+      this.encodedRegionName = region;
+      this.entryBuffer = new LinkedList<Entry>();
+    }
+
+    long appendEntry(Entry entry) {
+      internify(entry);
+      entryBuffer.add(entry);
+      long incrHeap = entry.getEdit().heapSize() +
+        ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
+        0; // TODO linkedlist entry
+      heapInBuffer += incrHeap;
+      return incrHeap;
+    }
+
+    private void internify(Entry entry) {
+      WALKey k = entry.getKey();
+      k.internTableName(this.tableName);
+      k.internEncodedRegionName(this.encodedRegionName);
+    }
+
+    @Override
+    public long heapSize() {
+      return heapInBuffer;
+    }
+  }
+
+  class WriterThread extends Thread {
+    private volatile boolean shouldStop = false;
+    private OutputSink outputSink = null;
+
+    WriterThread(OutputSink sink, int i) {
+      super(Thread.currentThread().getName() + "-Writer-" + i);
+      outputSink = sink;
+    }
+
+    @Override
+    public void run()  {
+      try {
+        doRun();
+      } catch (Throwable t) {
+        LOG.error("Exiting thread", t);
+        writerThreadError(t);
+      }
+    }
+
+    private void doRun() throws IOException {
+      LOG.debug("Writer thread " + this + ": starting");
+      while (true) {
+        RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+        if (buffer == null) {
+          // No data currently available, wait on some more to show up
+          synchronized (dataAvailable) {
+            if (shouldStop && !this.outputSink.flush()) {
+              return;
+            }
+            try {
+              dataAvailable.wait(500);
+            } catch (InterruptedException ie) {
+              if (!shouldStop) {
+                throw new RuntimeException(ie);
+              }
+            }
+          }
+          continue;
+        }
+
+        assert buffer != null;
+        try {
+          writeBuffer(buffer);
+        } finally {
+          entryBuffers.doneWriting(buffer);
+        }
+      }
+    }
+
+    private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
+      outputSink.append(buffer);
+    }
+
+    void finish() {
+      synchronized (dataAvailable) {
+        shouldStop = true;
+        dataAvailable.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * The following class is an abstraction class to provide a common interface to support both
+   * existing recovered edits file sink and region server WAL edits replay sink
+   */
+   abstract class OutputSink {
+
+    protected Map<byte[], SinkWriter> writers = Collections
+        .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
+
+    protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
+        .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
+
+    protected final List<WriterThread> writerThreads = Lists.newArrayList();
+
+    /* Set of regions which we've decided should not output edits */
+    protected final Set<byte[]> blacklistedRegions = Collections
+        .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
+
+    protected boolean closeAndCleanCompleted = false;
+
+    protected boolean writersClosed = false;
+
+    protected final int numThreads;
+
+    protected CancelableProgressable reporter = null;
+
+    protected AtomicLong skippedEdits = new AtomicLong();
+
+    protected List<Path> splits = null;
+
+    public OutputSink(int numWriters) {
+      numThreads = numWriters;
+    }
+
+    void setReporter(CancelableProgressable reporter) {
+      this.reporter = reporter;
+    }
+
+    /**
+     * Start the threads that will pump data from the entryBuffers to the output files.
+     */
+    synchronized void startWriterThreads() {
+      for (int i = 0; i < numThreads; i++) {
+        WriterThread t = new WriterThread(this, i);
+        t.start();
+        writerThreads.add(t);
+      }
+    }
+
+    /**
+     *
+     * Update region's maximum edit log SeqNum.
+     */
+    void updateRegionMaximumEditLogSeqNum(Entry entry) {
+      synchronized (regionMaximumEditLogSeqNum) {
+        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
+            .getEncodedRegionName());
+        if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
+          regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
+              .getLogSeqNum());
+        }
+      }
+    }
+
+    Long getRegionMaximumEditLogSeqNum(byte[] region) {
+      return regionMaximumEditLogSeqNum.get(region);
+    }
+
+    /**
+     * @return the number of currently opened writers
+     */
+    int getNumOpenWriters() {
+      return this.writers.size();
+    }
+
+    long getSkippedEdits() {
+      return this.skippedEdits.get();
+    }
+
+    /**
+     * Wait for writer threads to dump all info to the sink
+     * @return true when there is no error
+     * @throws IOException
+     */
+    protected boolean finishWriting() throws IOException {
+      LOG.debug("Waiting for split writer threads to finish");
+      boolean progress_failed = false;
+      for (WriterThread t : writerThreads) {
+        t.finish();
+      }
+      for (WriterThread t : writerThreads) {
+        if (!progress_failed && reporter != null && !reporter.progress()) {
+          progress_failed = true;
+        }
+        try {
+          t.join();
+        } catch (InterruptedException ie) {
+          IOException iie = new InterruptedIOException();
+          iie.initCause(ie);
+          throw iie;
+        }
+      }
+      checkForErrors();
+      LOG.info("Split writers finished");
+      return (!progress_failed);
+    }
+
+    abstract List<Path> finishWritingAndClose() throws IOException;
+
+    /**
+     * @return a map from encoded region ID to the number of edits written out for that region.
+     */
+    abstract Map<byte[], Long> getOutputCounts();
+
+    /**
+     * @return number of regions we've recovered
+     */
+    abstract int getNumberOfRecoveredRegions();
+
+    /**
+     * @param buffer A WAL Edit Entry
+     * @throws IOException
+     */
+    abstract void append(RegionEntryBuffer buffer) throws IOException;
+
+    /**
+     * WriterThread call this function to help flush internal remaining edits in buffer before close
+     * @return true when underlying sink has something to flush
+     */
+    protected boolean flush() throws IOException {
+      return false;
+    }
+  }
+
+  /**
+   * Class that manages the output streams from the log splitting process.
+   */
+  class LogRecoveredEditsOutputSink extends OutputSink {
+
+    public LogRecoveredEditsOutputSink(int numWriters) {
+      // More threads could potentially write faster at the expense
+      // of causing more disk seeks as the logs are split.
+      // 3. After a certain setting (probably around 3) the
+      // process will be bound on the reader in the current
+      // implementation anyway.
+      super(numWriters);
+    }
+
+    /**
+     * @return null if failed to report progress
+     * @throws IOException
+     */
+    @Override
+    List<Path> finishWritingAndClose() throws IOException {
+      boolean isSuccessful = false;
+      List<Path> result = null;
+      try {
+        isSuccessful = finishWriting();
+      } finally {
+        result = close();
+        List<IOException> thrown = closeLogWriters(null);
+        if (thrown != null && !thrown.isEmpty()) {
+          throw MultipleIOException.createIOException(thrown);
+        }
+      }
+      if (isSuccessful) {
+        splits = result;
+      }
+      return splits;
+    }
+
+    /**
+     * Close all of the output streams.
+     * @return the list of paths written.
+     */
+    private List<Path> close() throws IOException {
+      Preconditions.checkState(!closeAndCleanCompleted);
+
+      final List<Path> paths = new ArrayList<Path>();
+      final List<IOException> thrown = Lists.newArrayList();
+      ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
+        TimeUnit.SECONDS, new ThreadFactory() {
+          private int count = 1;
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, "split-log-closeStream-" + count++);
+            return t;
+          }
+        });
+      CompletionService<Void> completionService =
+        new ExecutorCompletionService<Void>(closeThreadPool);
+      for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
+        LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
+        completionService.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
+            LOG.debug("Closing " + wap.p);
+            try {
+              wap.w.close();
+            } catch (IOException ioe) {
+              LOG.error("Couldn't close log at " + wap.p, ioe);
+              thrown.add(ioe);
+              return null;
+            }
+            LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+                + (wap.nanosSpent / 1000 / 1000) + "ms)");
+
+            if (wap.editsWritten == 0) {
+              // just remove the empty recovered.edits file
+              if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+                LOG.warn("Failed deleting empty " + wap.p);
+                throw new IOException("Failed deleting empty  " + wap.p);
+              }
+              return null;
+            }
+
+            Path dst = getCompletedRecoveredEditsFilePath(wap.p,
+              regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
+            try {
+              if (!dst.equals(wap.p) && fs.exists(dst)) {
+                LOG.warn("Found existing old edits file. It could be the "
+                    + "result of a previous failed split attempt. Deleting " + dst + ", length="
+                    + fs.getFileStatus(dst).getLen());
+                if (!fs.delete(dst, false)) {
+                  LOG.warn("Failed deleting of old " + dst);
+                  throw new IOException("Failed deleting of old " + dst);
+                }
+              }
+              // Skip the unit tests which create a splitter that reads and
+              // writes the data without touching disk.
+              // TestHLogSplit#testThreading is an example.
+              if (fs.exists(wap.p)) {
+                if (!fs.rename(wap.p, dst)) {
+                  throw new IOException("Failed renaming " + wap.p + " to " + dst);
+                }
+                LOG.info("Rename " + wap.p + " to " + dst);
+              }
+            } catch (IOException ioe) {
+              LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+              thrown.add(ioe);
+              return null;
+            }
+            paths.add(dst);
+            return null;
+          }
+        });
+      }
+
+      boolean progress_failed = false;
+      try {
+        for (int i = 0, n = this.writers.size(); i < n; i++) {
+          Future<Void> future = completionService.take();
+          future.get();
+          if (!progress_failed && reporter != null && !reporter.progress()) {
+            progress_failed = true;
+          }
+        }
+      } catch (InterruptedException e) {
+        IOException iie = new InterruptedIOException();
+        iie.initCause(e);
+        throw iie;
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      } finally {
+        closeThreadPool.shutdownNow();
+      }
+
+      if (!thrown.isEmpty()) {
+        throw MultipleIOException.createIOException(thrown);
+      }
+      writersClosed = true;
+      closeAndCleanCompleted = true;
+      if (progress_failed) {
+        return null;
+      }
+      return paths;
+    }
+
+    private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
+      if (writersClosed) {
+        return thrown;
+      }
+
+      if (thrown == null) {
+        thrown = Lists.newArrayList();
+      }
+      try {
+        for (WriterThread t : writerThreads) {
+          while (t.isAlive()) {
+            t.shouldStop = true;
+            t.interrupt();
+            try {
+              t.join(10);
+            } catch (InterruptedException e) {
+              IOException iie = new InterruptedIOException();
+              iie.initCause(e);
+              throw iie;
+            }
+          }
+        }
+      } finally {
+        synchronized (writers) {
+          WriterAndPath wap = null;
+          for (SinkWriter tmpWAP : writers.values()) {
+            try {
+              wap = (WriterAndPath) tmpWAP;
+              wap.w.close();
+            } catch (IOException ioe) {
+              LOG.error("Couldn't close log at " + wap.p, ioe);
+              thrown.add(ioe);
+              continue;
+            }
+            LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+                + (wap.nanosSpent / 1000 / 1000) + "ms)");
+          }
+        }
+        writersClosed = true;
+      }
+
+      return thrown;
+    }
+
+    /**
+     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+     * long as multiple threads are always acting on different regions.
+     * @return null if this region shouldn't output any logs
+     */
+    private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+      byte region[] = entry.getKey().getEncodedRegionName();
+      WriterAndPath ret = (WriterAndPath) writers.get(region);
+      if (ret != null) {
+        return ret;
+      }
+      // If we already decided that this region doesn't get any output
+      // we don't need to check again.
+      if (blacklistedRegions.contains(region)) {
+        return null;
+      }
+      ret = createWAP(region, entry, rootDir);
+      if (ret == null) {
+        blacklistedRegions.add(region);
+        return null;
+      }
+      writers.put(region, ret);
+      return ret;
+    }
+
+    /**
+     * @return a path with a write for that path. caller should close.
+     */
+    private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
+      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
+      if (regionedits == null) {
+        return null;
+      }
+      if (fs.exists(regionedits)) {
+        LOG.warn("Found old edits file. It could be the "
+            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+            + fs.getFileStatus(regionedits).getLen());
+        if (!fs.delete(regionedits, false)) {
+          LOG.warn("Failed delete of old " + regionedits);
+        }
+      }
+      Writer w = createWriter(regionedits);
+      LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
+      return (new WriterAndPath(regionedits, w));
+    }
+
+    @Override
+    void append(RegionEntryBuffer buffer) throws IOException {
+      List<Entry> entries = buffer.entryBuffer;
+      if (entries.isEmpty()) {
+        LOG.warn("got an empty buffer, skipping");
+        return;
+      }
+
+      WriterAndPath wap = null;
+
+      long startTime = System.nanoTime();
+      try {
+        int editsCount = 0;
+
+        for (Entry logEntry : entries) {
+          if (wap == null) {
+            wap = getWriterAndPath(logEntry);
+            if (wap == null) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
+              }
+              return;
+            }
+          }
+          wap.w.append(logEntry);
+          this.updateRegionMaximumEditLogSeqNum(logEntry);
+          editsCount++;
+        }
+        // Pass along summary statistics
+        wap.incrementEdits(editsCount);
+        wap.incrementNanoTime(System.nanoTime() - startTime);
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.fatal(" Got while writing log entry to log", e);
+        throw e;
+      }
+    }
+
+    /**
+     * @return a map from encoded region ID to the number of edits written out for that region.
+     */
+    @Override
+    Map<byte[], Long> getOutputCounts() {
+      TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      synchronized (writers) {
+        for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
+          ret.put(entry.getKey(), entry.getValue().editsWritten);
+        }
+      }
+      return ret;
+    }
+
+    @Override
+    int getNumberOfRecoveredRegions() {
+      return writers.size();
+    }
+  }
+
+  /**
+   * Class wraps the actual writer which writes data out and related statistics
+   */
+  private abstract static class SinkWriter {
+    /* Count of edits written to this path */
+    long editsWritten = 0;
+    /* Number of nanos spent writing to this log */
+    long nanosSpent = 0;
+
+    void incrementEdits(int edits) {
+      editsWritten += edits;
+    }
+
+    void incrementNanoTime(long nanos) {
+      nanosSpent += nanos;
+    }
+  }
+
+  /**
+   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
+   * data written to this output.
+   */
+  private final static class WriterAndPath extends SinkWriter {
+    final Path p;
+    final Writer w;
+
+    WriterAndPath(final Path p, final Writer w) {
+      this.p = p;
+      this.w = w;
+    }
+  }
+
+  /**
+   * Class that manages to replay edits from WAL files directly to assigned fail over region servers
+   */
+  class LogReplayOutputSink extends OutputSink {
+    private static final double BUFFER_THRESHOLD = 0.35;
+    private static final String KEY_DELIMITER = "#";
+
+    private long waitRegionOnlineTimeOut;
+    private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
+    private final Map<String, RegionServerWriter> writers =
+        new ConcurrentHashMap<String, RegionServerWriter>();
+    // online encoded region name -> region location map
+    private final Map<String, HRegionLocation> onlineRegions =
+        new ConcurrentHashMap<String, HRegionLocation>();
+
+    private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
+        .synchronizedMap(new TreeMap<TableName, HConnection>());
+    /**
+     * Map key -> value layout
+     * <servername>:<table name> -> Queue<Row>
+     */
+    private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
+        new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
+    private List<Throwable> thrown = new ArrayList<Throwable>();
+
+    // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
+    // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
+    // assigned and online before it can replay wal edits while regions of disabling/disabled table
+    // won't be assigned by AM. We can retire this code after HBASE-8234.
+    private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
+    private boolean hasEditsInDisablingOrDisabledTables = false;
+
+    public LogReplayOutputSink(int numWriters) {
+      super(numWriters);
+      this.waitRegionOnlineTimeOut =
+          conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
+            ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
+      this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
+      this.logRecoveredEditsOutputSink.setReporter(reporter);
+    }
+
+    @Override
+    void append(RegionEntryBuffer buffer) throws IOException {
+      List<Entry> entries = buffer.entryBuffer;
+      if (entries.isEmpty()) {
+        LOG.warn("got an empty buffer, skipping");
+        return;
+      }
+
+      // check if current region in a disabling or disabled table
+      if (disablingOrDisabledTables.contains(buffer.tableName)) {
+        // need fall back to old way
+        logRecoveredEditsOutputSink.append(buffer);
+        hasEditsInDisablingOrDisabledTables = true;
+        // store regions we have recovered so far
+        addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
+        return;
+      }
+
+      // group entries by region servers
+      groupEditsByServer(entries);
+
+      // process workitems
+      String maxLocKey = null;
+      int maxSize = 0;
+      List<Pair<HRegionLocation, Entry>> maxQueue = null;
+      synchronized (this.serverToBufferQueueMap) {
+        for (String key : this.serverToBufferQueueMap.keySet()) {
+          List<Pair<HRegionLocation, Entry>> curQueue = this.serverToBufferQueueMap.get(key);
+          if (curQueue.size() > maxSize) {
+            maxSize = curQueue.size();
+            maxQueue = curQueue;
+            maxLocKey = key;
+          }
+        }
+        if (maxSize < minBatchSize
+            && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
+          // buffer more to process
+          return;
+        } else if (maxSize > 0) {
+          this.serverToBufferQueueMap.remove(maxLocKey);
+        }
+      }
+
+      if (maxSize > 0) {
+        processWorkItems(maxLocKey, maxQueue);
+      }
+    }
+
+    private void addToRecoveredRegions(String encodedRegionName) {
+      if (!recoveredRegions.contains(encodedRegionName)) {
+        recoveredRegions.add(encodedRegionName);
+      }
+    }
+
+    /**
+     * Helper function to group WALEntries to individual region servers
+     * @throws IOException
+     */
+    private void groupEditsByServer(List<Entry> entries) throws IOException {
+      Set<TableName> nonExistentTables = null;
+      Long cachedLastFlushedSequenceId = -1l;
+      for (Entry entry : entries) {
+        WALEdit edit = entry.getEdit();
+        TableName table = entry.getKey().getTablename();
+        // clear scopes which isn't needed for recovery
+        entry.getKey().setScopes(null);
+        String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
+        // skip edits of non-existent tables
+        if (nonExistentTables != null && nonExistentTables.contains(table)) {
+          this.skippedEdits.incrementAndGet();
+          continue;
+        }
+
+        Map<byte[], Long> maxStoreSequenceIds = null;
+        boolean needSkip = false;
+        HRegionLocation loc = null;
+        String locKey = null;
+        List<Cell> cells = edit.getCells();
+        List<Cell> skippedCells = new ArrayList<Cell>();
+        HConnection hconn = this.getConnectionByTableName(table);
+
+        for (Cell cell : cells) {
+          byte[] row = cell.getRow();
+          byte[] family = cell.getFamily();
+          boolean isCompactionEntry = false;
+          if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+            CompactionDescriptor compaction = WALEdit.getCompaction(cell);
+            if (compaction != null && compaction.hasRegionName()) {
+              try {
+                byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
+                  .toByteArray());
+                row = regionName[1]; // startKey of the region
+                family = compaction.getFamilyName().toByteArray();
+                isCompactionEntry = true;
+              } catch (Exception ex) {
+                LOG.warn("Unexpected exception received, ignoring " + ex);
+                skippedCells.add(cell);
+                continue;
+              }
+            } else {
+              skippedCells.add(cell);
+              continue;
+            }
+          }
+
+          try {
+            loc =
+                locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
+                  encodeRegionNameStr);
+            // skip replaying the compaction if the region is gone
+            if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
+              loc.getRegionInfo().getEncodedName())) {
+              LOG.info("Not replaying a compaction marker for an older region: "
+                  + encodeRegionNameStr);
+              needSkip = true;
+            }
+          } catch (TableNotFoundException ex) {
+            // table has been deleted so skip edits of the table
+            LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
+                + encodeRegionNameStr);
+            lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
+            if (nonExistentTables == null) {
+              nonExistentTables = new TreeSet<TableName>();
+            }
+            nonExistentTables.add(table);
+            this.skippedEdits.incrementAndGet();
+            needSkip = true;
+            break;
+          }
+
+          cachedLastFlushedSequenceId =
+              lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
+          if (cachedLastFlushedSequenceId != null
+              && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+            // skip the whole WAL entry
+            this.skippedEdits.incrementAndGet();
+            needSkip = true;
+            break;
+          } else {
+            if (maxStoreSequenceIds == null) {
+              maxStoreSequenceIds =
+                  regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
+            }
+            if (maxStoreSequenceIds != null) {
+              Long maxStoreSeqId = maxStoreSequenceIds.get(family);
+              if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
+                // skip current kv if column family doesn't exist anymore or already flushed
+                skippedCells.add(cell);
+                continue;
+              }
+            }
+          }
+        }
+
+        // skip the edit
+        if (loc == null || needSkip) continue;
+
+        if (!skippedCells.isEmpty()) {
+          cells.removeAll(skippedCells);
+        }
+
+        synchronized (serverToBufferQueueMap) {
+          locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
+          List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
+          if (queue == null) {
+            queue =
+                Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
+            serverToBufferQueueMap.put(locKey, queue);
+          }
+          queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
+        }
+        // store regions we have recovered so far
+        addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
+      }
+    }
+
+    /**
+     * Locate destination region based on table name & row. This function also makes sure the
+     * destination region is online for replay.
+     * @throws IOException
+     */
+    private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
+        TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
+      // fetch location from cache
+      HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
+      if(loc != null) return loc;
+      // fetch location from hbase:meta directly without using cache to avoid hit old dead server
+      loc = hconn.getRegionLocation(table, row, true);
+      if (loc == null) {
+        throw new IOException("Can't locate location for row:" + Bytes.toString(row)
+            + " of table:" + table);
+      }
+      // check if current row moves to a different region due to region merge/split
+      if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
+        // originalEncodedRegionName should have already flushed
+        lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
+        HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
+        if (tmpLoc != null) return tmpLoc;
+      }
+
+      Long lastFlushedSequenceId = -1l;
+      AtomicBoolean isRecovering = new AtomicBoolean(true);
+      loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
+      if (!isRecovering.get()) {
+        // region isn't in recovering at all because WAL file may contain a region that has
+        // been moved to somewhere before hosting RS fails
+        lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
+        LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
+            + " because it's not in recovering.");
+      } else {
+        Long cachedLastFlushedSequenceId =
+            lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
+
+        // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
+        // update the value for the region
+        RegionStoreSequenceIds ids =
+            csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
+              loc.getRegionInfo().getEncodedName());
+        if (ids != null) {
+          lastFlushedSequenceId = ids.getLastFlushedSequenceId();
+          Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+          List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
+          for (StoreSequenceId id : maxSeqIdInStores) {
+            storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
+          }
+          regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
+        }
+
+        if (cachedLastFlushedSequenceId == null
+            || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
+          lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
+        }
+      }
+
+      onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
+      return loc;
+    }
+
+    private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
+        throws IOException {
+      RegionServerWriter rsw = null;
+
+      long startTime = System.nanoTime();
+      try {
+        rsw = getRegionServerWriter(key);
+        rsw.sink.replayEntries(actions);
+
+        // Pass along summary statistics
+        rsw.incrementEdits(actions.size());
+        rsw.incrementNanoTime(System.nanoTime() - startTime);
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.fatal(" Got while writing log entry to log", e);
+        throw e;
+      }
+    }
+
+    /**
+     * Wait until region is online on the destination region server
+     * @param loc
+     * @param row
+     * @param timeout How long to wait
+     * @param isRecovering Recovering state of the region interested on destination region server.
+     * @return True when region is online on the destination region server
+     * @throws InterruptedException
+     */
+    private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
+        final long timeout, AtomicBoolean isRecovering)
+        throws IOException {
+      final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
+      final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      boolean reloadLocation = false;
+      TableName tableName = loc.getRegionInfo().getTable();
+      int tries = 0;
+      Throwable cause = null;
+      while (endTime > EnvironmentEdgeManager.currentTime()) {
+        try {
+          // Try and get regioninfo from the hosting server.
+          HConnection hconn = getConnectionByTableName(tableName);
+          if(reloadLocation) {
+            loc = hconn.getRegionLocation(tableName, row, true);
+          }
+          BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
+          HRegionInfo region = loc.getRegionInfo();
+          try {
+            GetRegionInfoRequest request =
+                RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
+            GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
+            if (HRegionInfo.convert(response.getRegionInfo()) != null) {
+              isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
+              return loc;
+            }
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        } catch (IOException e) {
+          cause = e.getCause();
+          if(!(cause instanceof RegionOpeningException)) {
+            reloadLocation = true;
+          }
+        }
+        long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
+        try {
+          Thread.sleep(expectedSleep);
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted when waiting region " +
+              loc.getRegionInfo().getEncodedName() + " online.", e);
+        }
+        tries++;
+      }
+
+      throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
+        " online for " + timeout + " milliseconds.", cause);
+    }
+
+    @Override
+    protected boolean flush() throws IOException {
+      String curLoc = null;
+      int curSize = 0;
+      List<Pair<HRegionLocation, Entry>> curQueue = null;
+      synchronized (this.serverToBufferQueueMap) {
+        for (String locationKey : this.serverToBufferQueueMap.keySet()) {
+          curQueue = this.serverToBufferQueueMap.get(locationKey);
+          if (!curQueue.isEmpty()) {
+            curSize = curQueue.size();
+            curLoc = locationKey;
+            break;
+          }
+        }
+        if (curSize > 0) {
+          this.serverToBufferQueueMap.remove(curLoc);
+        }
+      }
+
+      if (curSize > 0) {
+        this.processWorkItems(curLoc, curQueue);
+        // We should already have control of the monitor; ensure this is the case.
+        synchronized(dataAvailable) {
+          dataAvailable.notifyAll();
+        }
+        return true;
+      }
+      return false;
+    }
+
+    void addWriterError(Throwable t) {
+      thrown.add(t);
+    }
+
+    @Override
+    List<Path> finishWritingAndClose() throws IOException {
+      try {
+        if (!finishWriting()) {
+          return null;
+        }
+        if (hasEditsInDisablingOrDisabledTables) {
+          splits = logRecoveredEditsOutputSink.finishWritingAndClose();
+        } else {
+          splits = new ArrayList<Path>();
+        }
+        // returns an empty array in order to keep interface same as old way
+        return splits;
+      } finally {
+        List<IOException> thrown = closeRegionServerWriters();
+        if (thrown != null && !thrown.isEmpty()) {
+          throw MultipleIOException.createIOException(thrown);
+        }
+      }
+    }
+
+    @Override
+    int getNumOpenWriters() {
+      return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
+    }
+
+    private List<IOException> closeRegionServerWriters() throws IOException {
+      List<IOException> result = null;
+      if (!writersClosed) {
+        result = Lists.newArrayList();
+        try {
+          for (WriterThread t : writerThreads) {
+            while (t.isAlive()) {
+              t.shouldStop = true;
+              t.interrupt();
+              try {
+                t.join(10);
+              } catch (InterruptedException e) {
+                IOException iie = new InterruptedIOException();
+                iie.initCause(e);
+                throw iie;
+              }
+            }
+          }
+        } finally {
+          synchronized (writers) {
+            for (String locationKey : writers.keySet()) {
+              RegionServerWriter tmpW = writers.get(locationKey);
+              try {
+                tmpW.close();
+              } catch (IOException ioe) {
+                LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
+                result.add(ioe);
+              }
+            }
+          }
+
+          // close connections
+          synchronized (this.tableNameToHConnectionMap) {
+            for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
+              HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
+              try {
+                hconn.clearRegionCache();
+                hconn.close();
+              } catch (IOException ioe) {
+                result.add(ioe);
+              }
+            }
+          }
+          writersClosed = true;
+        }
+      }
+      return result;
+    }
+
+    @Override
+    Map<byte[], Long> getOutputCounts() {
+      TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      synchronized (writers) {
+        for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
+          ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+        }
+      }
+      return ret;
+    }
+
+    @Override
+    int getNumberOfRecoveredRegions() {
+      return this.recoveredRegions.size();
+    }
+
+    /**
+     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+     * long as multiple threads are always acting on different regions.
+     * @return null if this region shouldn't output any logs
+     */
+    private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
+      RegionServerWriter ret = writers.get(loc);
+      if (ret != null) {
+        return ret;
+      }
+
+      TableName tableName = getTableFromLocationStr(loc);
+      if(tableName == null){
+        throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
+      }
+
+      HConnection hconn = getConnectionByTableName(tableName);
+      synchronized (writers) {
+        ret = writers.get(loc);
+        if (ret == null) {
+          ret = new RegionServerWriter(conf, tableName, hconn);
+          writers.put(loc, ret);
+        }
+      }
+      return ret;
+    }
+
+    private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
+      HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
+      if (hconn == null) {
+        synchronized (this.tableNameToHConnectionMap) {
+          hconn = this.tableNameToHConnectionMap.get(tableName);
+          if (hconn == null) {
+            hconn = HConnectionManager.getConnection(conf);
+            this.tableNameToHConnectionMap.put(tableName, hconn);
+          }
+        }
+      }
+      return hconn;
+    }
+    private TableName getTableFromLocationStr(String loc) {
+      /**
+       * location key is in format <server name:port>#<table name>
+       */
+      String[] splits = loc.split(KEY_DELIMITER);
+      if (splits.length != 2) {
+        return null;
+      }
+      return TableName.valueOf(splits[1]);
+    }
+  }
+
+  /**
+   * Private data structure that wraps a receiving RS and collecting statistics about the data
+   * written to this newly assigned RS.
+   */
+  private final static class RegionServerWriter extends SinkWriter {
+    final WALEditsReplaySink sink;
+
+    RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
+        throws IOException {
+      this.sink = new WALEditsReplaySink(conf, tableName, conn);
+    }
+
+    void close() throws IOException {
+    }
+  }
+
+  static class CorruptedLogFileException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    CorruptedLogFileException(String s) {
+      super(s);
+    }
+  }
+
+  /** A struct used by getMutationsFromWALEntry */
+  public static class MutationReplay {
+    public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
+      this.type = type;
+      this.mutation = mutation;
+      if(this.mutation.getDurability() != Durability.SKIP_WAL) {
+        // using ASYNC_WAL for relay
+        this.mutation.setDurability(Durability.ASYNC_WAL);
+      }
+      this.nonceGroup = nonceGroup;
+      this.nonce = nonce;
+    }
+
+    public final MutationType type;
+    public final Mutation mutation;
+    public final long nonceGroup;
+    public final long nonce;
+  }
+
+  /**
+   * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
+   * WALEdit from the passed in WALEntry
+   * @param entry
+   * @param cells
+   * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
+   *          extracted from the passed in WALEntry.
+   * @return list of Pair<MutationType, Mutation> to be replayed
+   * @throws IOException
+   */
+  public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
+      Pair<WALKey, WALEdit> logEntry) throws IOException {
+
+    if (entry == null) {
+      // return an empty array
+      return new ArrayList<MutationReplay>();
+    }
+
+    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
+      entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
+    int count = entry.getAssociatedCellCount();
+    List<MutationReplay> mutations = new ArrayList<MutationReplay>();
+    Cell previousCell = null;
+    Mutation m = null;
+    WALKey key = null;
+    WALEdit val = null;
+    if (logEntry != null) val = new WALEdit();
+
+    for (int i = 0; i < count; i++) {
+      // Throw index out of bounds if our cell count is off
+      if (!cells.advance()) {
+        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+      }
+      Cell cell = cells.current();
+      if (val != null) val.add(cell);
+
+      boolean isNewRowOrType =
+          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
+              || !CellUtil.matchingRow(previousCell, cell);
+      if (isNewRowOrType) {
+        // Create new mutation
+        if (CellUtil.isDelete(cell)) {
+          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+          // Deletes don't have nonces.
+          mutations.add(new MutationReplay(
+              MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
+        } else {
+          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+          // Puts might come from increment or append, thus we need nonces.
+          long nonceGroup = entry.getKey().hasNonceGroup()
+              ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
+          long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
+          mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
+        }
+      }
+      if (CellUtil.isDelete(cell)) {
+        ((Delete) m).addDeleteMarker(cell);
+      } else {
+        ((Put) m).add(cell);
+      }
+      previousCell = cell;
+    }
+
+    // reconstruct WALKey
+    if (logEntry != null) {
+      org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
+      List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
+      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
+        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
+      }
+      // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+      key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
+              walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
+              clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
+      logEntry.setFirst(key);
+      logEntry.setSecond(val);
+    }
+
+    return mutations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index 54d47e5..325fe0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -157,7 +157,7 @@ public class ZKSplitLog {
   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
     long lastRecordedFlushedSequenceId = -1l;
     try {
-      lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
+      lastRecordedFlushedSequenceId = ZKUtil.parseWALPositionFrom(bytes);
     } catch (DeserializationException e) {
       lastRecordedFlushedSequenceId = -1l;
       LOG.warn("Can't parse last flushed sequence Id", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
index de2bf64..8dba04c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
@@ -140,7 +140,7 @@ public abstract class HBaseTestCase extends TestCase {
 
   /**
    * You must call close on the returned region and then close on the log file
-   * it created. Do {@link HRegion#close()} followed by {@link HRegion#getLog()}
+   * it created. Do {@link HRegion#close()} followed by {@link HRegion#getWAL()}
    * and on it call close.
    * @param desc
    * @param startKey

http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 271401b..393c314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -89,7 +89,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.tool.Canary;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -568,7 +568,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     return this.dfsCluster;
   }
 
-  public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException {
+  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
     createDirsAndSetProperties();
     dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
         null, null, null);
@@ -1638,18 +1638,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
   }
 
   /**
-   * Create an HRegion that writes to the local tmp dirs with specified hlog
+   * Create an HRegion that writes to the local tmp dirs with specified wal
    * @param info regioninfo
    * @param desc table descriptor
-   * @param hlog hlog for this region.
+   * @param wal wal for this region.
    * @return created hregion
    * @throws IOException
    */
-  public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, HLog hlog) throws IOException {
-    return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, hlog);
+  public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, WAL wal)
+      throws IOException {
+    return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
   }
 
-
   /**
    * @param tableName
    * @param startKey
@@ -1664,7 +1664,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    */
   public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
-      HLog hlog, byte[]... families) throws IOException {
+      WAL wal, byte[]... families) throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
     htd.setReadOnly(isReadOnly);
     for (byte[] family : families) {
@@ -1675,7 +1675,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     }
     htd.setDurability(durability);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
-    return createLocalHRegion(info, htd, hlog);
+    return createLocalHRegion(info, htd, wal);
   }
   //
   // ==========================================================================
@@ -1854,7 +1854,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
           Put put = new Put(k);
           put.setDurability(Durability.SKIP_WAL);
           put.add(f, null, k);
-          if (r.getLog() == null) put.setDurability(Durability.SKIP_WAL);
+          if (r.getWAL() == null) put.setDurability(Durability.SKIP_WAL);
 
           int preRowCount = rowCount;
           int pause = 10;
@@ -2872,7 +2872,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
    * makes tests linger.  Here is the exception you'll see:
    * <pre>
-   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
+   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
    * </pre>
    * @param stream A DFSClient.DFSOutputStream.
    * @param max