You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/15 06:25:58 UTC

svn commit: r1482676 [4/5] - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ hbase-client/src/main/ja...

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed May 15 04:25:57 2013
@@ -23,9 +23,11 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +36,8 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
@@ -41,6 +45,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
@@ -51,12 +57,30 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
 import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@@ -67,9 +91,17 @@ import org.apache.hadoop.hbase.util.Canc
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -102,6 +134,9 @@ public class HLogSplitter {
   OutputSink outputSink;
   EntryBuffers entryBuffers;
 
+  private Set<String> disablingOrDisabledTables = new HashSet<String>();
+  private ZooKeeperWatcher watcher;
+
   // If an exception is thrown by one of the other threads, it will be
   // stored here.
   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
@@ -115,10 +150,21 @@ public class HLogSplitter {
   // For checking the latest flushed sequence id
   protected final LastSequenceId sequenceIdChecker;
 
+  protected boolean distributedLogReplay;
+
+  // Map encodedRegionName -> lastFlushedSequenceId
+  Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
+
+  // Number of writer threads
+  private final int numWriterThreads;
+
+  // Min batch size when replay WAL edits
+  private final int minBatchSize;
+  
   /**
    * Create a new HLogSplitter using the given {@link Configuration} and the
-   * <code>hbase.hlog.splitter.impl</code> property to derived the instance
-   * class to use.
+   * <code>hbase.hlog.splitter.impl</code> property to derived the instance class to use.
+   * distributedLogReplay won't be enabled by this constructor.
    * <p>
    * @param conf
    * @param rootDir hbase directory
@@ -161,17 +207,37 @@ public class HLogSplitter {
 
   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
       Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
+      this(conf, rootDir, srcDir, oldLogDir, fs, idChecker, null);
+  }
+
+  public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
+      Path oldLogDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
     this.conf = conf;
     this.rootDir = rootDir;
     this.srcDir = srcDir;
     this.oldLogDir = oldLogDir;
     this.fs = fs;
     this.sequenceIdChecker = idChecker;
+    this.watcher = zkw;
 
     entryBuffers = new EntryBuffers(
         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
             128*1024*1024));
-    outputSink = new OutputSink();
+
+    this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512);
+    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
+
+    this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+    if (zkw != null && this.distributedLogReplay) {
+      outputSink = new LogReplayOutputSink(numWriterThreads);
+    } else {
+      if (this.distributedLogReplay) {
+        LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
+      }
+      this.distributedLogReplay = false;
+      outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
+    }
   }
 
   /**
@@ -260,26 +326,26 @@ public class HLogSplitter {
   }
 
   /**
-   * Splits the HLog edits in the given list of logfiles (that are a mix of edits
-   * on multiple regions) by region and then splits them per region directories,
-   * in batches of (hbase.hlog.split.batch.size)
+   * Splits or Replays the HLog edits in the given list of logfiles (that are a mix of edits on
+   * multiple regions) by region and then splits(or replay when distributedLogReplay is true) them
+   * per region directories, in batches.
    * <p>
-   * This process is split into multiple threads. In the main thread, we loop
-   * through the logs to be split. For each log, we:
+   * This process is split into multiple threads. In the main thread, we loop through the logs to be
+   * split. For each log, we:
    * <ul>
-   *   <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
-   *   <li> Read each edit (see {@link #parseHLog}</li>
-   *   <li> Mark as "processed" or "corrupt" depending on outcome</li>
+   * <li>Recover it (take and drop HDFS lease) to ensure no other process can write</li>
+   * <li>Read each edit (see {@link #parseHLog}</li>
+   * <li>Mark as "processed" or "corrupt" depending on outcome</li>
    * </ul>
    * <p>
-   * Each edit is passed into the EntryBuffers instance, which takes care of
-   * memory accounting and splitting the edits by region.
+   * Each edit is passed into the EntryBuffers instance, which takes care of memory accounting and
+   * splitting the edits by region.
    * <p>
-   * The OutputSink object then manages N other WriterThreads which pull chunks
-   * of edits from EntryBuffers and write them to the output region directories.
+   * The OutputSink object then manages N other WriterThreads which pull chunks of edits from
+   * EntryBuffers and write them to either recovered.edits files or replay them to newly assigned
+   * region servers directly
    * <p>
-   * After the process is complete, the log files are archived to a separate
-   * directory.
+   * After the process is complete, the log files are archived to a separate directory.
    */
   private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
       throws IOException {
@@ -368,8 +434,7 @@ public class HLogSplitter {
   /**
    * Splits a HLog file into region's recovered-edits directory
    * <p>
-   * If the log file has N regions then N recovered.edits files will be
-   * produced.
+   * If the log file has N regions then N recovered.edits files will be produced.
    * <p>
    * @param rootDir
    * @param logfile
@@ -377,22 +442,23 @@ public class HLogSplitter {
    * @param conf
    * @param reporter
    * @param idChecker
+   * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
+   *          dump out recoved.edits files for regions to replay on.
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
-  static public boolean splitLogFile(Path rootDir, FileStatus logfile,
-      FileSystem fs, Configuration conf, CancelableProgressable reporter,
-      LastSequenceId idChecker)
+  static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+      Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
+      ZooKeeperWatcher zkw)
       throws IOException {
-    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs, idChecker);
+    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null/* oldLogDir */, fs, idChecker, zkw);
     return s.splitLogFile(logfile, reporter);
   }
 
   /**
    * Splits a HLog file into region's recovered-edits directory
    * <p>
-   * If the log file has N regions then N recovered.edits files will be
-   * produced.
+   * If the log file has N regions then N recovered.edits files will be produced.
    * <p>
    * @param rootDir
    * @param logfile
@@ -402,10 +468,10 @@ public class HLogSplitter {
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
-  static public boolean splitLogFile(Path rootDir, FileStatus logfile,
-      FileSystem fs, Configuration conf, CancelableProgressable reporter)
+  static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+      Configuration conf, CancelableProgressable reporter)
       throws IOException {
-    return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null);
+    return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null, null);
   }
 
   public boolean splitLogFile(FileStatus logfile,
@@ -427,6 +493,7 @@ public class HLogSplitter {
         "into a temporary staging area.");
       long logLength = logfile.getLen();
       LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
+      LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
       status.setStatus("Opening log file");
       if (reporter != null && !reporter.progress()) {
         progress_failed = true;
@@ -445,25 +512,37 @@ public class HLogSplitter {
         LOG.warn("Nothing to split in log file " + logPath);
         return true;
       }
+      if(watcher != null) {
+        try {
+          disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
+        } catch (KeeperException e) {
+          throw new IOException("Can't get disabling/disabled tables", e);
+        }
+      }
       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
       int numOpenedFilesLastCheck = 0;
       outputSink.setReporter(reporter);
       outputSink.startWriterThreads();
       outputSinkStarted = true;
-      // Report progress every so many edits and/or files opened (opening a file
-      // takes a bit of time).
-      Map<byte[], Long> lastFlushedSequenceIds =
-        new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
       Entry entry;
-
-      while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
+      Long lastFlushedSequenceId = -1L;
+      ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logPath);
+      String serverNameStr = (serverName == null) ? "" : serverName.getServerName(); 
+      while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
         byte[] region = entry.getKey().getEncodedRegionName();
-        Long lastFlushedSequenceId = -1l;
-        if (sequenceIdChecker != null) {
-          lastFlushedSequenceId = lastFlushedSequenceIds.get(region);
-          if (lastFlushedSequenceId == null) {
+        String key = Bytes.toString(region);
+        lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
+        if (lastFlushedSequenceId == null) {
+          if (this.distributedLogReplay) {
+            lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(this.watcher,
+              serverNameStr, key);
+          } else if (sequenceIdChecker != null) {
             lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
-            lastFlushedSequenceIds.put(region, lastFlushedSequenceId);
+          }
+          if (lastFlushedSequenceId != null) {
+            lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
+          } else {
+            lastFlushedSequenceId = -1L;
           }
         }
         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@@ -472,12 +551,13 @@ public class HLogSplitter {
         }
         entryBuffers.appendEntry(entry);
         editsCount++;
+        int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
         // If sufficient edits have passed, check if we should report progress.
         if (editsCount % interval == 0
-            || (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) {
-          numOpenedFilesLastCheck = outputSink.logWriters.size();
-          String countsStr = (editsCount - editsSkipped) +
-            " edits, skipped " + editsSkipped + " edits.";
+            || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
+          numOpenedFilesLastCheck = this.getNumOpenWriters();
+          String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
+              + " edits, skipped " + editsSkipped + " edits.";
           status.setStatus("Split " + countsStr);
           if (reporter != null && !reporter.progress()) {
             progress_failed = true;
@@ -502,9 +582,8 @@ public class HLogSplitter {
         progress_failed = outputSink.finishWritingAndClose() == null;
       }
       String msg = "Processed " + editsCount + " edits across "
-        + outputSink.getOutputCounts().size() + " regions; log file="
-        + logPath + " is corrupted = " + isCorrupted + " progress failed = "
-        + progress_failed;
+          + outputSink.getNumberOfRecoveredRegions() + " regions; log file=" + logPath
+          + " is corrupted = " + isCorrupted + " progress failed = " + progress_failed;
       LOG.info(msg);
       status.markComplete(msg);
     }
@@ -832,6 +911,18 @@ public class HLogSplitter {
   }
 
   /**
+   * Get current open writers
+   * @return
+   */
+  private int getNumOpenWriters() {
+    int result = 0;
+    if (this.outputSink != null) {
+      result += this.outputSink.getNumOpenWriters();
+    }
+    return result;
+  }
+
+  /**
    * Class which accumulates edits and separates them into a buffer per region
    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
    * a predefined threshold.
@@ -880,20 +971,23 @@ public class HLogSplitter {
         totalBuffered += incrHeap;
         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
-          dataAvailable.wait(3000);
+          dataAvailable.wait(2000);
         }
         dataAvailable.notifyAll();
       }
       checkForErrors();
     }
 
+    /**
+     * @return RegionEntryBuffer a buffer of edits to be written or replayed.
+     */
     synchronized RegionEntryBuffer getChunkToWrite() {
-      long biggestSize=0;
-      byte[] biggestBufferKey=null;
+      long biggestSize = 0;
+      byte[] biggestBufferKey = null;
 
       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
         long size = entry.getValue().heapSize();
-        if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
+        if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
           biggestSize = size;
           biggestBufferKey = entry.getKey();
         }
@@ -968,9 +1062,11 @@ public class HLogSplitter {
 
   class WriterThread extends Thread {
     private volatile boolean shouldStop = false;
+    private OutputSink outputSink = null;
 
-    WriterThread(int i) {
+    WriterThread(OutputSink sink, int i) {
       super("WriterThread-" + i);
+      outputSink = sink;
     }
 
     public void run()  {
@@ -989,9 +1085,11 @@ public class HLogSplitter {
         if (buffer == null) {
           // No data currently available, wait on some more to show up
           synchronized (dataAvailable) {
-            if (shouldStop) return;
+            if (shouldStop && !this.outputSink.flush()) {
+              return;
+            }
             try {
-              dataAvailable.wait(1000);
+              dataAvailable.wait(500);
             } catch (InterruptedException ie) {
               if (!shouldStop) {
                 throw new RuntimeException(ie);
@@ -1012,39 +1110,7 @@ public class HLogSplitter {
 
 
     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
-      List<Entry> entries = buffer.entryBuffer;
-      if (entries.isEmpty()) {
-        LOG.warn(this.getName() + " got an empty buffer, skipping");
-        return;
-      }
-
-      WriterAndPath wap = null;
-
-      long startTime = System.nanoTime();
-      try {
-        int editsCount = 0;
-
-        for (Entry logEntry : entries) {
-          if (wap == null) {
-            wap = outputSink.getWriterAndPath(logEntry);
-            if (wap == null) {
-              // getWriterAndPath decided we don't need to write these edits
-              // Message was already logged
-              return;
-            }
-          }
-          wap.w.append(logEntry);
-          outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
-          editsCount++;
-        }
-        // Pass along summary statistics
-        wap.incrementEdits(editsCount);
-        wap.incrementNanoTime(System.nanoTime() - startTime);
-      } catch (IOException e) {
-        e = RemoteExceptionHandler.checkIOException(e);
-        LOG.fatal(this.getName() + " Got while writing log entry to log", e);
-        throw e;
-      }
+      outputSink.append(buffer);
     }
 
     void finish() {
@@ -1055,28 +1121,6 @@ public class HLogSplitter {
     }
   }
 
-  private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
-      FileSystem fs, Configuration conf)
-  throws IOException {
-    Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
-    if (regionedits == null) {
-      return null;
-    }
-    if (fs.exists(regionedits)) {
-      LOG.warn("Found existing old edits file. It could be the "
-          + "result of a previous failed split attempt. Deleting "
-          + regionedits + ", length="
-          + fs.getFileStatus(regionedits).getLen());
-      if (!fs.delete(regionedits, false)) {
-        LOG.warn("Failed delete of old " + regionedits);
-      }
-    }
-    Writer w = createWriter(fs, regionedits, conf);
-    LOG.debug("Creating writer path=" + regionedits + " region="
-        + Bytes.toStringBinary(region));
-    return (new WriterAndPath(regionedits, w));
-  }
-
   Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
     List<String> components = new ArrayList<String>(10);
     do {
@@ -1109,35 +1153,35 @@ public class HLogSplitter {
   }
 
   /**
-   * Class that manages the output streams from the log splitting process.
+   * The following class is an abstraction class to provide a common interface to support both
+   * existing recovered edits file sink and region server WAL edits replay sink
    */
-  class OutputSink {
-    private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
-          new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
-    private final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
+   abstract class OutputSink {
+
+    protected Map<byte[], SinkWriter> writers = Collections
+        .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
+
+    protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
-    private final List<WriterThread> writerThreads = Lists.newArrayList();
+
+    protected final List<WriterThread> writerThreads = Lists.newArrayList();
 
     /* Set of regions which we've decided should not output edits */
-    private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
-        new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
+    protected final Set<byte[]> blacklistedRegions = Collections
+        .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
 
-    private boolean closeAndCleanCompleted = false;
+    protected boolean closeAndCleanCompleted = false;
 
-    private boolean logWritersClosed  = false;
+    protected boolean writersClosed = false;
 
-    private final int numThreads;
+    protected final int numThreads;
 
-    private CancelableProgressable reporter = null;
+    protected CancelableProgressable reporter = null;
 
-    public OutputSink() {
-      // More threads could potentially write faster at the expense
-      // of causing more disk seeks as the logs are split.
-      // 3. After a certain setting (probably around 3) the
-      // process will be bound on the reader in the current
-      // implementation anyway.
-      numThreads = conf.getInt(
-          "hbase.regionserver.hlog.splitlog.writer.threads", 3);
+    protected AtomicLong skippedEdits = new AtomicLong();
+
+    public OutputSink(int numWriters) {
+      numThreads = numWriters;
     }
 
     void setReporter(CancelableProgressable reporter) {
@@ -1145,12 +1189,11 @@ public class HLogSplitter {
     }
 
     /**
-     * Start the threads that will pump data from the entryBuffers
-     * to the output files.
+     * Start the threads that will pump data from the entryBuffers to the output files.
      */
     synchronized void startWriterThreads() {
       for (int i = 0; i < numThreads; i++) {
-        WriterThread t = new WriterThread(i);
+        WriterThread t = new WriterThread(this, i);
         t.start();
         writerThreads.add(t);
       }
@@ -1158,66 +1201,147 @@ public class HLogSplitter {
 
     /**
      *
-     * @return null if failed to report progress
+     * Update region's maximum edit log SeqNum.
+     */
+    void updateRegionMaximumEditLogSeqNum(Entry entry) {
+      synchronized (regionMaximumEditLogSeqNum) {
+        Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
+            .getEncodedRegionName());
+        if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
+          regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
+              .getLogSeqNum());
+        }
+      }
+    }
+
+    Long getRegionMaximumEditLogSeqNum(byte[] region) {
+      return regionMaximumEditLogSeqNum.get(region);
+    }
+
+    /**
+     * @return the number of currently opened writers
+     */
+    int getNumOpenWriters() {
+      return this.writers.size();
+    }
+
+    long getSkippedEdits() {
+      return this.skippedEdits.get();
+    }
+
+    /**
+     * Wait for writer threads to dump all info to the sink
+     * @return true when there is no error
      * @throws IOException
      */
-    List<Path> finishWritingAndClose() throws IOException {
+    protected boolean finishWriting() throws IOException {
       LOG.info("Waiting for split writer threads to finish");
       boolean progress_failed = false;
-      try {
-        for (WriterThread t : writerThreads) {
-          t.finish();
-        }
-        for (WriterThread t : writerThreads) {
-          if (!progress_failed && reporter != null && !reporter.progress()) {
-            progress_failed = true;
-          }
-          try {
-            t.join();
-          } catch (InterruptedException ie) {
-            IOException iie = new InterruptedIOException();
-            iie.initCause(ie);
-            throw iie;
-          }
-          checkForErrors();
+      for (WriterThread t : writerThreads) {
+        t.finish();
+      }
+      for (WriterThread t : writerThreads) {
+        if (!progress_failed && reporter != null && !reporter.progress()) {
+          progress_failed = true;
         }
-        LOG.info("Split writers finished");
-        if (progress_failed) {
-          return null;
+        try {
+          t.join();
+        } catch (InterruptedException ie) {
+          IOException iie = new InterruptedIOException();
+          iie.initCause(ie);
+          throw iie;
         }
-        return closeStreams();
+        checkForErrors();
+      }
+      LOG.info("Split writers finished");
+      return (!progress_failed);
+    }
+
+    abstract List<Path> finishWritingAndClose() throws IOException;
+
+    /**
+     * @return a map from encoded region ID to the number of edits written out for that region.
+     */
+    abstract Map<byte[], Long> getOutputCounts();
+
+    /**
+     * @return number of regions we've recovered
+     */
+    abstract int getNumberOfRecoveredRegions();
+
+    /**
+     * @param entry A WAL Edit Entry
+     * @throws IOException
+     */
+    abstract void append(RegionEntryBuffer buffer) throws IOException;
+
+    /**
+     * WriterThread call this function to help flush internal remaining edits in buffer before close
+     * @return true when underlying sink has something to flush
+     */
+    protected boolean flush() throws IOException {
+      return false;
+    }
+  }
+
+  /**
+   * Class that manages the output streams from the log splitting process.
+   */
+  class LogRecoveredEditsOutputSink extends OutputSink {
+
+    public LogRecoveredEditsOutputSink(int numWriters) {
+      // More threads could potentially write faster at the expense
+      // of causing more disk seeks as the logs are split.
+      // 3. After a certain setting (probably around 3) the
+      // process will be bound on the reader in the current
+      // implementation anyway.
+      super(numWriters);
+    }
+
+    /**
+     * @return null if failed to report progress
+     * @throws IOException
+     */
+    @Override
+    List<Path> finishWritingAndClose() throws IOException {
+      boolean isSuccessful = false;
+      List<Path> result = null;
+      try {
+        isSuccessful = finishWriting();
       } finally {
+        result = close();
         List<IOException> thrown = closeLogWriters(null);
         if (thrown != null && !thrown.isEmpty()) {
           throw MultipleIOException.createIOException(thrown);
         }
       }
+      return (isSuccessful) ? result : null;
     }
 
     /**
      * Close all of the output streams.
      * @return the list of paths written.
      */
-    private List<Path> closeStreams() throws IOException {
+    private List<Path> close() throws IOException {
       Preconditions.checkState(!closeAndCleanCompleted);
 
       final List<Path> paths = new ArrayList<Path>();
       final List<IOException> thrown = Lists.newArrayList();
-      ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(
-          numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
-            private int count = 1;
-            public Thread newThread(Runnable r) {
-              Thread t = new Thread(r, "split-log-closeStream-" + count++);
-              return t;
-            }
-          });
+      ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
+        TimeUnit.SECONDS, new ThreadFactory() {
+          private int count = 1;
+
+          public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, "split-log-closeStream-" + count++);
+            return t;
+          }
+        });
       CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
           closeThreadPool);
-      for (final Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
-          .entrySet()) {
+      for (final Map.Entry<byte[], ? extends SinkWriter> writersEntry : writers.entrySet()) {
         completionService.submit(new Callable<Void>() {
           public Void call() throws Exception {
-            WriterAndPath wap = logWritersEntry.getValue();
+            WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
             try {
               wap.w.close();
             } catch (IOException ioe) {
@@ -1225,15 +1349,25 @@ public class HLogSplitter {
               thrown.add(ioe);
               return null;
             }
-            LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
-                + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+            LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+                + (wap.nanosSpent / 1000 / 1000) + "ms)");
+
+            if (wap.editsWritten == 0) {
+              // just remove the empty recovered.edits file
+              if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
+                LOG.warn("Failed deleting empty " + wap.p);
+                throw new IOException("Failed deleting empty  " + wap.p);
+              }
+              return null;
+            }
+
             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
-                regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
+              regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
             try {
               if (!dst.equals(wap.p) && fs.exists(dst)) {
                 LOG.warn("Found existing old edits file. It could be the "
-                    + "result of a previous failed split attempt. Deleting "
-                    + dst + ", length=" + fs.getFileStatus(dst).getLen());
+                    + "result of a previous failed split attempt. Deleting " + dst + ", length="
+                    + fs.getFileStatus(dst).getLen());
                 if (!fs.delete(dst, false)) {
                   LOG.warn("Failed deleting of old " + dst);
                   throw new IOException("Failed deleting of old " + dst);
@@ -1244,8 +1378,7 @@ public class HLogSplitter {
               // TestHLogSplit#testThreading is an example.
               if (fs.exists(wap.p)) {
                 if (!fs.rename(wap.p, dst)) {
-                  throw new IOException("Failed renaming " + wap.p + " to "
-                      + dst);
+                  throw new IOException("Failed renaming " + wap.p + " to " + dst);
                 }
                 LOG.debug("Rename " + wap.p + " to " + dst);
               }
@@ -1262,7 +1395,7 @@ public class HLogSplitter {
 
       boolean progress_failed = false;
       try {
-        for (int i = 0, n = logWriters.size(); i < n; i++) {
+        for (int i = 0, n = this.writers.size(); i < n; i++) {
           Future<Void> future = completionService.take();
           future.get();
           if (!progress_failed && reporter != null && !reporter.progress()) {
@@ -1282,7 +1415,7 @@ public class HLogSplitter {
       if (!thrown.isEmpty()) {
         throw MultipleIOException.createIOException(thrown);
       }
-      logWritersClosed = true;
+      writersClosed = true;
       closeAndCleanCompleted = true;
       if (progress_failed) {
         return null;
@@ -1290,57 +1423,58 @@ public class HLogSplitter {
       return paths;
     }
 
-    private List<IOException> closeLogWriters(List<IOException> thrown)
-        throws IOException {
-      if (!logWritersClosed) {
-        if (thrown == null) {
-          thrown = Lists.newArrayList();
-        }
-        try {
-          for (WriterThread t : writerThreads) {
-            while (t.isAlive()) {
-              t.shouldStop = true;
-              t.interrupt();
-              try {
-                t.join(10);
-              } catch (InterruptedException e) {
-                IOException iie = new InterruptedIOException();
-                iie.initCause(e);
-                throw iie;
-              }
+    private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
+      if (writersClosed) {
+        return thrown;
+      }
+
+      if (thrown == null) {
+        thrown = Lists.newArrayList();
+      }
+      try {
+        for (WriterThread t : writerThreads) {
+          while (t.isAlive()) {
+            t.shouldStop = true;
+            t.interrupt();
+            try {
+              t.join(10);
+            } catch (InterruptedException e) {
+              IOException iie = new InterruptedIOException();
+              iie.initCause(e);
+              throw iie;
             }
           }
-        } finally {
-          synchronized (logWriters) {
-            for (WriterAndPath wap : logWriters.values()) {
-              try {
-                wap.w.close();
-              } catch (IOException ioe) {
-                LOG.error("Couldn't close log at " + wap.p, ioe);
-                thrown.add(ioe);
-                continue;
-              }
-              LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
-                  + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+        }
+      } finally {
+        synchronized (writers) {
+          WriterAndPath wap = null;
+          for (SinkWriter tmpWAP : writers.values()) {
+            try {
+              wap = (WriterAndPath) tmpWAP;
+              wap.w.close();
+            } catch (IOException ioe) {
+              LOG.error("Couldn't close log at " + wap.p, ioe);
+              thrown.add(ioe);
+              continue;
             }
+            LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+                + (wap.nanosSpent / 1000 / 1000) + "ms)");
           }
-          logWritersClosed = true;
         }
+        writersClosed = true;
       }
+
       return thrown;
     }
 
     /**
-     * Get a writer and path for a log starting at the given entry.
-     *
-     * This function is threadsafe so long as multiple threads are always
-     * acting on different regions.
-     *
+     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+     * long as multiple threads are always acting on different regions.
      * @return null if this region shouldn't output any logs
      */
-    WriterAndPath getWriterAndPath(Entry entry) throws IOException {
+    private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
       byte region[] = entry.getKey().getEncodedRegionName();
-      WriterAndPath ret = logWriters.get(region);
+      WriterAndPath ret = (WriterAndPath) writers.get(region);
       if (ret != null) {
         return ret;
       }
@@ -1354,75 +1488,624 @@ public class HLogSplitter {
         blacklistedRegions.add(region);
         return null;
       }
-      logWriters.put(region, ret);
+      writers.put(region, ret);
       return ret;
     }
 
-    /**
-     * Update region's maximum edit log SeqNum.
-     */
-    void updateRegionMaximumEditLogSeqNum(Entry entry) {
-      synchronized (regionMaximumEditLogSeqNum) {
-        Long currentMaxSeqNum=regionMaximumEditLogSeqNum.get(entry.getKey().getEncodedRegionName());
-        if (currentMaxSeqNum == null
-            || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
-          regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
-              entry.getKey().getLogSeqNum());
+    private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, FileSystem fs,
+        Configuration conf) throws IOException {
+      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
+      if (regionedits == null) {
+        return null;
+      }
+      if (fs.exists(regionedits)) {
+        LOG.warn("Found old edits file. It could be the "
+            + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+            + fs.getFileStatus(regionedits).getLen());
+        if (!fs.delete(regionedits, false)) {
+          LOG.warn("Failed delete of old " + regionedits);
         }
       }
-
+      Writer w = createWriter(fs, regionedits, conf);
+      LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
+      return (new WriterAndPath(regionedits, w));
     }
 
-    Long getRegionMaximumEditLogSeqNum(byte[] region) {
-      return regionMaximumEditLogSeqNum.get(region);
+    void append(RegionEntryBuffer buffer) throws IOException {
+      List<Entry> entries = buffer.entryBuffer;
+      if (entries.isEmpty()) {
+        LOG.warn("got an empty buffer, skipping");
+        return;
+      }
+
+      WriterAndPath wap = null;
+
+      long startTime = System.nanoTime();
+      try {
+        int editsCount = 0;
+
+        for (Entry logEntry : entries) {
+          if (wap == null) {
+            wap = getWriterAndPath(logEntry);
+            if (wap == null) {
+              // getWriterAndPath decided we don't need to write these edits
+              return;
+            }
+          }
+          wap.w.append(logEntry);
+          this.updateRegionMaximumEditLogSeqNum(logEntry);
+          editsCount++;
+        }
+        // Pass along summary statistics
+        wap.incrementEdits(editsCount);
+        wap.incrementNanoTime(System.nanoTime() - startTime);
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.fatal(" Got while writing log entry to log", e);
+        throw e;
+      }
     }
 
     /**
-     * @return a map from encoded region ID to the number of edits written out
-     * for that region.
+     * @return a map from encoded region ID to the number of edits written out for that region.
      */
-    private Map<byte[], Long> getOutputCounts() {
-      TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
-          Bytes.BYTES_COMPARATOR);
-      synchronized (logWriters) {
-        for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
+    Map<byte[], Long> getOutputCounts() {
+      TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      synchronized (writers) {
+        for (Map.Entry<byte[], ? extends SinkWriter> entry : writers.entrySet()) {
           ret.put(entry.getKey(), entry.getValue().editsWritten);
         }
       }
       return ret;
     }
+
+    @Override
+    int getNumberOfRecoveredRegions() {
+      return writers.size();
+    }
   }
 
   /**
-   *  Private data structure that wraps a Writer and its Path,
-   *  also collecting statistics about the data written to this
-   *  output.
+   * Class wraps the actual writer which writes data out and related statistics
    */
-  private final static class WriterAndPath {
-    final Path p;
-    final Writer w;
-
+  private abstract static class SinkWriter {
     /* Count of edits written to this path */
     long editsWritten = 0;
     /* Number of nanos spent writing to this log */
     long nanosSpent = 0;
 
+    void incrementEdits(int edits) {
+      editsWritten += edits;
+    }
+
+    void incrementNanoTime(long nanos) {
+      nanosSpent += nanos;
+    }
+  }
+
+  /**
+   * Private data structure that wraps a Writer and its Path, also collecting statistics about the
+   * data written to this output.
+   */
+  private final static class WriterAndPath extends SinkWriter {
+    final Path p;
+    final Writer w;
+
     WriterAndPath(final Path p, final Writer w) {
       this.p = p;
       this.w = w;
     }
+  }
 
-    void incrementEdits(int edits) {
-      editsWritten += edits;
+  /**
+   * Class that manages to replay edits from WAL files directly to assigned fail over region servers
+   */
+  class LogReplayOutputSink extends OutputSink {
+    private static final double BUFFER_THRESHOLD = 0.35;
+    private static final String KEY_DELIMITER = "#";
+
+    private long waitRegionOnlineTimeOut;
+    private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
+    private final Map<String, RegionServerWriter> writers = 
+        new ConcurrentHashMap<String, RegionServerWriter>();
+    // online encoded region name map
+    private final Set<String> onlineRegions = Collections.synchronizedSet(new HashSet<String>());
+
+    private Map<byte[], HConnection> tableNameToHConnectionMap = Collections
+        .synchronizedMap(new TreeMap<byte[], HConnection>(Bytes.BYTES_COMPARATOR));
+    /**
+     * Map key -> value layout 
+     * <servername>:<table name> -> Queue<Row>
+     */
+    private Map<String, List<Pair<HRegionLocation, Row>>> serverToBufferQueueMap = 
+        new ConcurrentHashMap<String, List<Pair<HRegionLocation, Row>>>();
+    private List<Throwable> thrown = new ArrayList<Throwable>();
+
+    // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
+    // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
+    // assigned and online before it can replay wal edits while regions of disabling/disabled table
+    // won't be assigned by AM. We can retire this code after HBASE-8234.
+    private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
+    private boolean hasEditsInDisablingOrDisabledTables = false;
+
+    public LogReplayOutputSink(int numWriters) {
+      super(numWriters);
+
+      this.waitRegionOnlineTimeOut = conf.getInt("hbase.splitlog.manager.timeout", 
+        SplitLogManager.DEFAULT_TIMEOUT);
+      this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(numWriters);
+      this.logRecoveredEditsOutputSink.setReporter(reporter);
     }
 
-    void incrementNanoTime(long nanos) {
-      nanosSpent += nanos;
+    void append(RegionEntryBuffer buffer) throws IOException {
+      List<Entry> entries = buffer.entryBuffer;
+      if (entries.isEmpty()) {
+        LOG.warn("got an empty buffer, skipping");
+        return;
+      }
+      
+      // check if current region in a disabling or disabled table
+      if (disablingOrDisabledTables.contains(Bytes.toString(buffer.tableName))) {
+        // need fall back to old way
+        logRecoveredEditsOutputSink.append(buffer);
+        hasEditsInDisablingOrDisabledTables = true;
+        // store regions we have recovered so far
+        addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
+        return;
+      }
+
+      // group entries by region servers
+      groupEditsByServer(entries);
+
+      // process workitems
+      String maxLocKey = null;
+      int maxSize = 0;
+      List<Pair<HRegionLocation, Row>> maxQueue = null;
+      synchronized (this.serverToBufferQueueMap) {
+        for (String key : this.serverToBufferQueueMap.keySet()) {
+          List<Pair<HRegionLocation, Row>> curQueue = this.serverToBufferQueueMap.get(key);
+          if (curQueue.size() > maxSize) {
+            maxSize = curQueue.size();
+            maxQueue = curQueue;
+            maxLocKey = key;
+          }
+        }
+        if (maxSize < minBatchSize
+            && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
+          // buffer more to process
+          return;
+        } else if (maxSize > 0) {
+          this.serverToBufferQueueMap.remove(maxLocKey);
+        }
+      }
+
+      if (maxSize > 0) {
+        processWorkItems(maxLocKey, maxQueue);
+      }
+    }
+
+    private void addToRecoveredRegions(String encodedRegionName) {
+      if (!recoveredRegions.contains(encodedRegionName)) {
+        recoveredRegions.add(encodedRegionName);
+      }
+    }
+
+    /**
+     * Helper function to group WALEntries to individual region servers
+     * @throws IOException
+     */
+    private void groupEditsByServer(List<Entry> entries) throws IOException {
+      Set<byte[]> nonExistentTables = null;
+      Long cachedLastFlushedSequenceId = -1l;
+      for (HLog.Entry entry : entries) {
+        WALEdit edit = entry.getEdit();
+        byte[] table = entry.getKey().getTablename();
+        String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
+        // skip edits of non-existent tables
+        if (nonExistentTables != null && nonExistentTables.contains(table)) {
+          this.skippedEdits.incrementAndGet();
+          continue;
+        }
+        boolean needSkip = false;
+        Put put = null;
+        Delete del = null;
+        KeyValue lastKV = null;
+        HRegionLocation loc = null;
+        Row preRow = null;
+        HRegionLocation preLoc = null;
+        Row lastAddedRow = null; // it is not really needed here just be conservative
+        String preKey = null;
+        List<KeyValue> kvs = edit.getKeyValues();
+        HConnection hconn = this.getConnectionByTableName(table);
+
+        for (KeyValue kv : kvs) {
+          // filtering HLog meta entries
+          // We don't handle HBASE-2231 because we may or may not replay a compaction event.
+          // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
+          // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
+          if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
+
+          if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
+            if (preRow != null) {
+              synchronized (serverToBufferQueueMap) {
+                List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
+                if (queue == null) {
+                  queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
+                  serverToBufferQueueMap.put(preKey, queue);
+                }
+                queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
+                lastAddedRow = preRow;
+              }
+              // store regions we have recovered so far
+              addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
+            }
+
+            try {
+              loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow());
+            } catch (TableNotFoundException ex) {
+              // table has been deleted so skip edits of the table
+              LOG.info("Table " + Bytes.toString(table)
+                  + " doesn't exist. Skip log replay for region " + encodeRegionNameStr);
+              lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
+              if (nonExistentTables == null) {
+                nonExistentTables = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+              }
+              nonExistentTables.add(table);
+              this.skippedEdits.incrementAndGet();
+              needSkip = true;
+              break;
+            }
+            cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
+                .getEncodedName());
+            if (cachedLastFlushedSequenceId != null
+                && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+              // skip the whole HLog entry
+              this.skippedEdits.incrementAndGet();
+              needSkip = true;
+              break;
+            }
+            
+            if (kv.isDelete()) {
+              del = new Delete(kv.getRow());
+              del.setClusterId(entry.getKey().getClusterId());
+              preRow = del;
+            } else {
+              put = new Put(kv.getRow());
+              put.setClusterId(entry.getKey().getClusterId());
+              preRow = put;
+            }
+            preKey = loc.getHostnamePort() + KEY_DELIMITER + Bytes.toString(table);
+            preLoc = loc;
+          }
+          if (kv.isDelete()) {
+            del.addDeleteMarker(kv);
+          } else {
+            put.add(kv);
+          }
+          lastKV = kv;
+        }
+
+        // skip the edit
+        if(needSkip) continue;
+        
+        // add the last row
+        if (preRow != null && lastAddedRow != preRow) {
+          synchronized (serverToBufferQueueMap) {
+            List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
+            if (queue == null) {
+              queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
+              serverToBufferQueueMap.put(preKey, queue);
+            }
+            queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
+          }
+          // store regions we have recovered so far
+          addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
+        }
+      }
+    }
+
+    /**
+     * Locate destination region based on table name & row. This function also makes sure the
+     * destination region is online for replay.
+     * @throws IOException
+     */
+    private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
+        byte[] table, byte[] row) throws IOException {
+      HRegionLocation loc = hconn.getRegionLocation(table, row, false);
+      if (loc == null) {
+        throw new IOException("Can't locate location for row:" + Bytes.toString(row)
+            + " of table:" + Bytes.toString(table));
+      }
+      if (onlineRegions.contains(loc.getRegionInfo().getEncodedName())) {
+        return loc;
+      }
+
+      Long lastFlushedSequenceId = -1l;
+      loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut);
+      Long cachedLastFlushedSequenceId = lastFlushedSequenceIds.get(loc.getRegionInfo()
+          .getEncodedName());
+
+      onlineRegions.add(loc.getRegionInfo().getEncodedName());
+      // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
+      // update the value for the region
+      lastFlushedSequenceId = SplitLogManager.getLastFlushedSequenceId(watcher, loc
+          .getServerName().getServerName(), loc.getRegionInfo().getEncodedName());
+      if (cachedLastFlushedSequenceId == null
+          || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
+        lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
+      } else if (loc.getRegionInfo().isRecovering() == false) {
+        // region isn't in recovering at all because WAL file may contain a region that has
+        // been moved to somewhere before hosting RS fails
+        lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
+        LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
+            + " because it's not in recovering.");
+      }
+      
+      return loc;
+    }
+
+    private void processWorkItems(String key, List<Pair<HRegionLocation, Row>> actions)
+        throws IOException {
+      RegionServerWriter rsw = null;
+
+      long startTime = System.nanoTime();
+      try {
+        rsw = getRegionServerWriter(key);
+        rsw.sink.replayEntries(actions);
+
+        // Pass along summary statistics
+        rsw.incrementEdits(actions.size());
+        rsw.incrementNanoTime(System.nanoTime() - startTime);
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.fatal(" Got while writing log entry to log", e);
+        throw e;
+      }
+    }
+
+    /**
+     * Wait until region is online on the destination region server
+     * @param loc
+     * @param row
+     * @param timeout How long to wait
+     * @return True when region is online on the destination region server
+     * @throws InterruptedException
+     */
+    private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
+        final long timeout)
+        throws IOException { 
+      final long endTime = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+      final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+      boolean reloadLocation = false;
+      byte[] tableName = loc.getRegionInfo().getTableName();
+      int tries = 0;
+      Throwable cause = null;
+      while (endTime > EnvironmentEdgeManager.currentTimeMillis()) {
+        try {
+          // Try and get regioninfo from the hosting server.
+          HConnection hconn = getConnectionByTableName(tableName);
+          if(reloadLocation) {
+            loc = hconn.getRegionLocation(tableName, row, true);
+          }
+          BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
+          HRegionInfo region = loc.getRegionInfo();
+          if((region =ProtobufUtil.getRegionInfo(remoteSvr, region.getRegionName())) != null) {
+            loc.getRegionInfo().setRecovering(region.isRecovering());
+            return loc;
+          }
+        } catch (IOException e) {
+          cause = e.getCause();
+          if(!(cause instanceof RegionOpeningException)) {
+            reloadLocation = true;
+          }
+        }
+        long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
+        try {
+          Thread.sleep(expectedSleep);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted when waiting regon " + 
+              loc.getRegionInfo().getEncodedName() + " online.", e);
+        }
+        tries++;
+      }
+      
+      throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
+        " online for " + timeout + " milliseconds.", cause);
+    }
+
+    @Override
+    protected boolean flush() throws IOException {
+      String curLoc = null;
+      int curSize = 0;
+      List<Pair<HRegionLocation, Row>> curQueue = null;
+      synchronized (this.serverToBufferQueueMap) {
+        for (String locationKey : this.serverToBufferQueueMap.keySet()) {
+          curQueue = this.serverToBufferQueueMap.get(locationKey);
+          if (!curQueue.isEmpty()) {
+            curSize = curQueue.size();
+            curLoc = locationKey;
+            break;
+          }
+        }
+        if (curSize > 0) {
+          this.serverToBufferQueueMap.remove(curLoc);
+        }
+      }
+
+      if (curSize > 0) {
+        this.processWorkItems(curLoc, curQueue);
+        dataAvailable.notifyAll();
+        return true;
+      }
+      return false;
+    }
+
+    void addWriterError(Throwable t) {
+      thrown.add(t);
+    }
+
+    @Override
+    List<Path> finishWritingAndClose() throws IOException {
+      List<Path> result = new ArrayList<Path>();
+      try {
+        if (!finishWriting()) {
+          return null;
+        }
+        if (hasEditsInDisablingOrDisabledTables) {
+          result = logRecoveredEditsOutputSink.finishWritingAndClose();
+        }
+        // returns an empty array in order to keep interface same as old way
+        return result;
+      } finally {
+        List<IOException> thrown = closeRegionServerWriters();
+        if (thrown != null && !thrown.isEmpty()) {
+          throw MultipleIOException.createIOException(thrown);
+        }
+      }
+    }
+
+    @Override
+    int getNumOpenWriters() {
+      return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
+    }
+
+    private List<IOException> closeRegionServerWriters() throws IOException {
+      List<IOException> result = null;
+      if (!writersClosed) {
+        result = Lists.newArrayList();
+        try {
+          for (WriterThread t : writerThreads) {
+            while (t.isAlive()) {
+              t.shouldStop = true;
+              t.interrupt();
+              try {
+                t.join(10);
+              } catch (InterruptedException e) {
+                IOException iie = new InterruptedIOException();
+                iie.initCause(e);
+                throw iie;
+              }
+            }
+          }
+        } finally {
+          synchronized (writers) {
+            for (String locationKey : writers.keySet()) {
+              RegionServerWriter tmpW = writers.get(locationKey);
+              try {
+                tmpW.close();
+              } catch (IOException ioe) {
+                LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
+                result.add(ioe);
+              }
+            }
+          }
+
+          // close connections
+          synchronized (this.tableNameToHConnectionMap) {
+            for (byte[] tableName : this.tableNameToHConnectionMap.keySet()) {
+              HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
+              try {
+                hconn.close();
+              } catch (IOException ioe) {
+                result.add(ioe);
+              }
+            }
+          }
+          writersClosed = true;
+        }
+      }
+      return result;
+    }
+
+    Map<byte[], Long> getOutputCounts() {
+      TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      synchronized (writers) {
+        for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
+          ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+        }
+      }
+      return ret;
+    }
+
+    @Override
+    int getNumberOfRecoveredRegions() {
+      return this.recoveredRegions.size();
+    }
+
+    /**
+     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+     * long as multiple threads are always acting on different regions.
+     * @return null if this region shouldn't output any logs
+     */
+    private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
+      RegionServerWriter ret = writers.get(loc);
+      if (ret != null) {
+        return ret;
+      }
+
+      String tableName = getTableFromLocationStr(loc);
+      if(tableName.isEmpty()){
+        LOG.warn("Invalid location string:" + loc + " found.");
+      }
+
+      HConnection hconn = getConnectionByTableName(Bytes.toBytes(tableName));
+      synchronized (writers) {
+        ret = writers.get(loc);
+        if (ret == null) {
+          ret = new RegionServerWriter(conf, Bytes.toBytes(tableName), hconn);
+          writers.put(loc, ret);
+        }
+      }
+      return ret;
+    }
+
+    private HConnection getConnectionByTableName(final byte[] tableName) throws IOException {
+      HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
+      if (hconn == null) {
+        synchronized (this.tableNameToHConnectionMap) {
+          hconn = this.tableNameToHConnectionMap.get(tableName);
+          if (hconn == null) {
+            hconn =  HConnectionManager.createConnection(conf);
+            this.tableNameToHConnectionMap.put(tableName, hconn);
+          }
+        }
+      }
+      return hconn;
+    }
+    
+    private String getTableFromLocationStr(String loc) {
+      /**
+       * location key is in format <server name:port>#<table name>
+       */
+      String[] splits = loc.split(KEY_DELIMITER);
+      if (splits.length != 2) {
+        return "";
+      }
+      return splits[1];
+    }
+  }
+
+  /**
+   * Private data structure that wraps a receiving RS and collecting statistics about the data
+   * written to this newly assigned RS.
+   */
+  private final static class RegionServerWriter extends SinkWriter {
+    final WALEditsReplaySink sink;
+
+    RegionServerWriter(final Configuration conf, final byte[] tableName, final HConnection conn)
+        throws IOException {
+      this.sink = new WALEditsReplaySink(conf, tableName, conn);
+    }
+
+    void close() throws IOException {
     }
   }
 
   static class CorruptedLogFileException extends Exception {
     private static final long serialVersionUID = 1L;
+
     CorruptedLogFileException(String s) {
       super(s);
     }

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Wed May 15 04:25:57 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerNam
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Bytes;
 
 public class HLogUtil {
   static final Log LOG = LogFactory.getLog(HLogUtil.class);
@@ -168,6 +169,37 @@ public class HLogUtil {
   }
 
   /**
+   * This function returns region server name from a log file name which is in either format:
+   * hdfs://<name node>/hbase/.logs/<server name>-splitting/... or hdfs://<name
+   * node>/hbase/.logs/<server name>/...
+   * @param logFile
+   * @return null if the passed in logFile isn't a valid HLog file path
+   */
+  public static ServerName getServerNameFromHLogDirectoryName(Path logFile) {
+    Path logDir = logFile.getParent();
+    String logDirName = logDir.getName();
+    if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) {
+      logDir = logFile;
+      logDirName = logDir.getName();
+    }
+    ServerName serverName = null;
+    if (logDirName.endsWith(HLog.SPLITTING_EXT)) {
+      logDirName = logDirName.substring(0, logDirName.length() - HLog.SPLITTING_EXT.length());
+    }
+    try {
+      serverName = ServerName.parseServerName(logDirName);
+    } catch (IllegalArgumentException ex) {
+      serverName = null;
+      LOG.warn("Invalid log file path=" + logFile, ex);
+    }
+    if (serverName != null && serverName.getStartcode() < 0) {
+      LOG.warn("Invalid log file path=" + logFile);
+      return null;
+    }
+    return serverName;
+  }
+
+  /**
    * Returns sorted set of edit files made by wal-log splitter, excluding files
    * with '.temp' suffix.
    *

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java?rev=1482676&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALEditsReplay.java Wed May 15 04:25:57 2013
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Class used to push numbers about WAL edits replay into the metrics subsystem. This will take a
+ * single function call and turn it into multiple manipulations of the hadoop metrics system.
+ */
+@InterfaceAudience.Private
+public class MetricsWALEditsReplay {
+  static final Log LOG = LogFactory.getLog(MetricsWALEditsReplay.class);
+
+  private final MetricsEditsReplaySource source;
+
+  public MetricsWALEditsReplay() {
+    source = CompatibilitySingletonFactory.getInstance(MetricsEditsReplaySource.class);
+  }
+
+  /**
+   * Add the time a replay command took
+   */
+  void updateReplayTime(long time) {
+    source.updateReplayTime(time);
+  }
+
+  /**
+   * Add the batch size of each replay
+   */
+  void updateReplayBatchSize(long size) {
+    source.updateReplayDataSize(size);
+  }
+
+  /**
+   * Add the payload data size of each replay
+   */
+  void updateReplayDataSize(long size) {
+  }
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Wed May 15 04:25:57 2013
@@ -83,6 +83,7 @@ public class WALEdit implements Writable
   static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
   static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
   private final int VERSION_2 = -1;
+  private final boolean isReplay;
 
   private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
 
@@ -93,6 +94,11 @@ public class WALEdit implements Writable
   private CompressionContext compressionContext;
 
   public WALEdit() {
+    this(false);
+  }
+
+  public WALEdit(boolean isReplay) {
+    this.isReplay = isReplay;
   }
 
   /**
@@ -103,6 +109,14 @@ public class WALEdit implements Writable
     return Bytes.equals(METAFAMILY, f);
   }
 
+  /**
+   * @return True when current WALEdit is created by log replay. Replication skips WALEdits from
+   *         replay.
+   */
+  public boolean isReplay() {
+    return this.isReplay;
+  }
+
   public void setCompressionContext(final CompressionContext compressionContext) {
     this.compressionContext = compressionContext;
   }

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1482676&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java Wed May 15 04:25:57 2013
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is responsible for replaying the edits coming from a failed region server.
+ * <p/>
+ * This class uses the native HBase client in order to replay WAL entries.
+ * <p/>
+ */
+@InterfaceAudience.Private
+public class WALEditsReplaySink {
+
+  private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
+
+  private final Configuration conf;
+  private final HConnection conn;
+  private final byte[] tableName;
+  private final MetricsWALEditsReplay metrics;
+  private final AtomicLong totalReplayedEdits = new AtomicLong();
+  private final boolean skipErrors;
+
+  /**
+   * Create a sink for WAL log entries replay
+   * @param conf
+   * @param tableName
+   * @param conn
+   * @throws IOException
+   */
+  public WALEditsReplaySink(Configuration conf, byte[] tableName, HConnection conn)
+      throws IOException {
+    this.conf = conf;
+    this.metrics = new MetricsWALEditsReplay();
+    this.conn = conn;
+    this.tableName = tableName;
+    this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
+      HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
+  }
+
+  /**
+   * Replay an array of actions of the same region directly into the newly assigned Region Server
+   * @param actions
+   * @throws IOException
+   */
+  public void replayEntries(List<Pair<HRegionLocation, Row>> actions) throws IOException {
+    if (actions.size() == 0) {
+      return;
+    }
+
+    int batchSize = actions.size();
+    int dataSize = 0;
+    Map<HRegionInfo, List<Action<Row>>> actionsByRegion = 
+        new HashMap<HRegionInfo, List<Action<Row>>>();
+    HRegionLocation loc = null;
+    Row row = null;
+    List<Action<Row>> regionActions = null;
+    // Build the action list. 
+    for (int i = 0; i < batchSize; i++) {
+      loc = actions.get(i).getFirst();
+      row = actions.get(i).getSecond();
+      if (actionsByRegion.containsKey(loc.getRegionInfo())) {
+        regionActions = actionsByRegion.get(loc.getRegionInfo());
+      } else {
+        regionActions = new ArrayList<Action<Row>>();
+        actionsByRegion.put(loc.getRegionInfo(), regionActions);
+      }
+      Action<Row> action = new Action<Row>(row, i);
+      regionActions.add(action);
+      dataSize += row.getRow().length;
+    }
+    
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+
+    // replaying edits by region
+    for (HRegionInfo curRegion : actionsByRegion.keySet()) {
+      replayEdits(loc, curRegion, actionsByRegion.get(curRegion));
+    }
+
+    long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+    LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime
+        + "(ms)!");
+
+    metrics.updateReplayTime(endTime);
+    metrics.updateReplayBatchSize(batchSize);
+    metrics.updateReplayDataSize(dataSize);
+
+    this.totalReplayedEdits.addAndGet(batchSize);
+  }
+
+  /**
+   * Get a string representation of this sink's metrics
+   * @return string with the total replayed edits count
+   */
+  public String getStats() {
+    return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
+        + this.totalReplayedEdits;
+  }
+
+  private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
+      final List<Action<Row>> actions) throws IOException {
+    try {
+      ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>(
+          this.conn, this.tableName, regionLoc, regionInfo, actions);
+      callable.withRetries();
+    } catch (IOException ie) {
+      if (skipErrors) {
+        LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+            + "=true so continuing replayEdits with error:" + ie.getMessage());
+      } else {
+        throw ie;
+      }
+    }
+  }
+  
+  /**
+   * Callable that handles the <code>replay</code> method call going against a single regionserver
+   * @param <R>
+   */
+  class ReplayServerCallable<R> extends ServerCallable<MultiResponse> {
+    private HRegionInfo regionInfo;
+    private List<Action<Row>> actions;
+
+    private Map<HRegionLocation, Map<HRegionInfo, List<Action<Row>>>> retryActions = null;
+
+    ReplayServerCallable(final HConnection connection, final byte [] tableName, 
+        final HRegionLocation regionLoc, final HRegionInfo regionInfo,
+        final List<Action<Row>> actions) {
+      super(connection, tableName, null);
+      this.actions = actions;
+      this.regionInfo = regionInfo;
+      this.location = regionLoc;
+    }
+    
+    @Override
+    public MultiResponse call() throws IOException {
+      try {
+        replayToServer(this.regionInfo, this.actions);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+      return null;
+    }
+
+    private void replayToServer(HRegionInfo regionInfo, List<Action<Row>> actions)
+        throws IOException, ServiceException {
+      AdminService.BlockingInterface remoteSvr = connection.getAdmin(location.getServerName());
+      MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(),
+        actions);
+      MultiResponse protoResults = remoteSvr.replay(null, request);
+      // check if it's a partial success
+      List<ActionResult> resultList = protoResults.getResultList();
+      for (int i = 0, n = resultList.size(); i < n; i++) {
+        ActionResult result = resultList.get(i);
+        if (result.hasException()) {
+          Throwable t = ProtobufUtil.toException(result.getException());
+          if (!skipErrors) {
+            IOException ie = new IOException();
+            ie.initCause(t);
+            // retry
+            throw ie;
+          } else {
+            LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+                + "=true so continuing replayToServer with error:" + t.getMessage());
+            return;
+          }
+        }
+      }
+    }
+
+    @Override
+    public void prepare(boolean reload) throws IOException {
+      if (!reload) return;
+      
+      // relocate regions in case we have a new dead server or network hiccup
+      // if not due to connection issue, the following code should run fast because it uses
+      // cached location
+      for (Action<Row> action : actions) {
+        // use first row to relocate region because all actions are for one region
+        this.location = this.connection.locateRegion(tableName, action.getAction().getRow());
+        break;
+      }
+    }
+  }
+  
+}

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java?rev=1482676&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java Wed May 15 04:25:57 2013
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Watcher used to be notified of the recovering region coming out of recovering state
+ */
+@InterfaceAudience.Private
+public class RecoveringRegionWatcher extends ZooKeeperListener {
+  private static final Log LOG = LogFactory.getLog(RecoveringRegionWatcher.class);
+
+  private HRegionServer server;
+  
+  /**
+   * Construct a ZooKeeper event listener.
+   */
+  public RecoveringRegionWatcher(ZooKeeperWatcher watcher, HRegionServer server) {
+    super(watcher);
+    watcher.registerListener(this);
+    this.server = server;
+  }
+
+  /**
+   * Called when a node has been deleted
+   * @param path full path of the deleted node
+   */
+  public void nodeDeleted(String path) {
+    if (this.server.isStopped() || this.server.isStopping()) {
+      return;
+    }
+
+    String parentPath = path.substring(0, path.lastIndexOf('/'));
+    if (!this.watcher.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
+      return;
+    }
+
+    String regionName = path.substring(parentPath.length() + 1);
+    HRegion region = this.server.getRecoveringRegions().remove(regionName);
+    if (region != null) {
+      region.setRecovering(false);
+    }
+
+    LOG.info(path + " znode deleted. Region: " + regionName + " completes recovery.");
+  }
+
+  @Override
+  public void nodeDataChanged(String path) {
+    registerWatcher(path);
+  }
+
+  @Override
+  public void nodeChildrenChanged(String path) {
+    registerWatcher(path);
+  }
+
+  /**
+   * Reinstall watcher because watcher only fire once though we're only interested in nodeDeleted
+   * event we need to register the watcher in case other event happens
+   */
+  private void registerWatcher(String path) {
+    String parentPath = path.substring(0, path.lastIndexOf('/'));
+    if (!this.watcher.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
+      return;
+    }
+
+    try {
+      ZKUtil.getDataAndWatch(watcher, path);
+    } catch (KeeperException e) {
+      LOG.warn("Can't register watcher on znode " + path, e);
+    }
+  }
+}

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java Wed May 15 04:25:57 2013
@@ -217,6 +217,7 @@ public class TestIOFencing {
 
   public void doTest(Class<?> regionClass) throws Exception {
     Configuration c = TEST_UTIL.getConfiguration();
+    c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
     // Insert our custom region
     c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
     c.setBoolean("dfs.support.append", true);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Wed May 15 04:25:57 2013
@@ -58,7 +58,7 @@ public class TestRegionServerCoprocessor
     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);  // Let's fail fast.
     conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, BuggyRegionObserver.class.getName());
     conf.set("hbase.coprocessor.abortonerror", "true");
-    TEST_UTIL.startMiniCluster(2);
+    TEST_UTIL.startMiniCluster(3);
   }
 
   @AfterClass

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Wed May 15 04:25:57 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
@@ -520,4 +521,17 @@ ClientProtos.ClientService.BlockingInter
   public ExecutorService getExecutorService() {
     return null;
   }
+
+  @Override
+  public MultiResponse replay(RpcController controller, MultiRequest request)
+      throws ServiceException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Map<String, HRegion> getRecoveringRegions() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1482676&r1=1482675&r2=1482676&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Wed May 15 04:25:57 2013
@@ -331,6 +331,12 @@ public class TestCatalogJanitor {
     public void dispatchMergingRegions(HRegionInfo region_a, HRegionInfo region_b,
         boolean forcible) throws IOException {
     }
+
+    @Override
+    public boolean isInitialized() {
+      // Auto-generated method stub
+      return false;
+    }
   }
 
   @Test