You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/08 01:43:30 UTC

[3/5] hbase git commit: HBASE-19128 Purge Distributed Log Replay from codebase, configurations, text; mark the feature as unsupported, broken.

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 215d2ed..39063a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -18,9 +18,7 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@@ -32,7 +30,6 @@ import java.io.InterruptedIOException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +47,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -67,26 +63,13 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
-import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -95,24 +78,15 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -122,11 +96,12 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * This class is responsible for splitting up a bunch of regionserver commit log
- * files that are no longer being written to, into new files, one per region for
- * region to replay on startup. Delete the old log files when finished.
+ * files that are no longer being written to, into new files, one per region, for
+ * recovering data on startup. Delete the old log files when finished.
  */
 @InterfaceAudience.Private
 public class WALSplitter {
@@ -142,14 +117,10 @@ public class WALSplitter {
 
   // Major subcomponents of the split process.
   // These are separated into inner classes to make testing easier.
-  PipelineController controller;
   OutputSink outputSink;
-  EntryBuffers entryBuffers;
+  private EntryBuffers entryBuffers;
 
-  private Map<TableName, TableState> tableStatesCache =
-      new ConcurrentHashMap<>();
   private SplitLogWorkerCoordination splitLogWorkerCoordination;
-  private Connection connection;
   private final WALFactory walFactory;
 
   private MonitoredTask status;
@@ -157,31 +128,19 @@ public class WALSplitter {
   // For checking the latest flushed sequence id
   protected final LastSequenceId sequenceIdChecker;
 
-  protected boolean distributedLogReplay;
-
   // Map encodedRegionName -> lastFlushedSequenceId
   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>();
 
   // Map encodedRegionName -> maxSeqIdInStores
   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
 
-  // Failed region server that the wal file being split belongs to
-  protected String failedServerName = "";
-
-  // Number of writer threads
-  private final int numWriterThreads;
-
-  // Min batch size when replay WAL edits
-  private final int minBatchSize;
-
   // the file being split currently
   private FileStatus fileBeingSplit;
 
   @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
       FileSystem fs, LastSequenceId idChecker,
-      SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
-      RecoveryMode mode) {
+      SplitLogWorkerCoordination splitLogWorkerCoordination) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@@ -190,31 +149,15 @@ public class WALSplitter {
     this.fs = fs;
     this.sequenceIdChecker = idChecker;
     this.splitLogWorkerCoordination = splitLogWorkerCoordination;
-    this.connection = connection;
 
     this.walFactory = factory;
-    this.controller = new PipelineController();
+    PipelineController controller = new PipelineController();
 
     entryBuffers = new EntryBuffers(controller,
-        this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
-            128*1024*1024));
-
-    // a larger minBatchSize may slow down recovery because replay writer has to wait for
-    // enough edits before replaying them
-    this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
-    this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
-
-    this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-    if (this.splitLogWorkerCoordination != null && this.distributedLogReplay) {
-      outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
-    } else {
-      if (this.distributedLogReplay) {
-        LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
-      }
-      this.distributedLogReplay = false;
-      outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
-    }
+        this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024));
 
+    int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+    outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
   }
 
   /**
@@ -227,10 +170,10 @@ public class WALSplitter {
    */
   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
-      SplitLogWorkerCoordination splitLogWorkerCoordination, Connection connection,
-      RecoveryMode mode, final WALFactory factory) throws IOException {
+      SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
+      throws IOException {
     WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
-        splitLogWorkerCoordination, connection, mode);
+        splitLogWorkerCoordination);
     return s.splitLogFile(logfile, reporter);
   }
 
@@ -246,8 +189,7 @@ public class WALSplitter {
     List<Path> splits = new ArrayList<>();
     if (logfiles != null && logfiles.length > 0) {
       for (FileStatus logfile: logfiles) {
-        WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, null,
-            RecoveryMode.LOG_SPLITTING);
+        WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
           if (s.outputSink.splits != null) {
@@ -281,22 +223,20 @@ public class WALSplitter {
     int editsCount = 0;
     int editsSkipped = 0;
 
-    status =
-        TaskMonitor.get().createStatus(
+    status = TaskMonitor.get().createStatus(
           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
-    Reader in = null;
+    Reader logFileReader = null;
     this.fileBeingSplit = logfile;
     try {
       long logLength = logfile.getLen();
-      LOG.info("Splitting WAL=" + logPath + ", length=" + logLength +
-          ", distributedLogReplay=" + this.distributedLogReplay);
+      LOG.info("Splitting WAL=" + logPath + ", length=" + logLength);
       status.setStatus("Opening log file");
       if (reporter != null && !reporter.progress()) {
         progress_failed = true;
         return false;
       }
-      in = getReader(logfile, skipErrors, reporter);
-      if (in == null) {
+      logFileReader = getReader(logfile, skipErrors, reporter);
+      if (logFileReader == null) {
         LOG.warn("Nothing to split in WAL=" + logPath);
         return true;
       }
@@ -307,26 +247,12 @@ public class WALSplitter {
       outputSinkStarted = true;
       Entry entry;
       Long lastFlushedSequenceId = -1L;
-      // THIS IS BROKEN!!!! GETTING SERVERNAME FROM PATH IS NOT GOING TO WORK IF LAYOUT CHANGES!!!
-      // TODO: Fix.
-      ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logPath);
-      failedServerName = (serverName == null) ? "" : serverName.getServerName();
-      while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
+      while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
         byte[] region = entry.getKey().getEncodedRegionName();
         String encodedRegionNameAsStr = Bytes.toString(region);
         lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
         if (lastFlushedSequenceId == null) {
-          if (this.distributedLogReplay) {
-            RegionStoreSequenceIds ids = splitLogWorkerCoordination.getRegionFlushedSequenceId(
-                failedServerName, encodedRegionNameAsStr);
-            if (ids != null) {
-              lastFlushedSequenceId = ids.getLastFlushedSequenceId();
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
-                  TextFormat.shortDebugString(ids));
-              }
-            }
-          } else if (sequenceIdChecker != null) {
+          if (sequenceIdChecker != null) {
             RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
             Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
             for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
@@ -390,8 +316,8 @@ public class WALSplitter {
     } finally {
       LOG.debug("Finishing writing output logs and closing down.");
       try {
-        if (null != in) {
-          in.close();
+        if (null != logFileReader) {
+          logFileReader.close();
         }
       } catch (IOException exception) {
         LOG.warn("Could not close WAL reader: " + exception.getMessage());
@@ -954,7 +880,7 @@ public class WALSplitter {
     }
 
     /**
-     * @return RegionEntryBuffer a buffer of edits to be written or replayed.
+     * @return RegionEntryBuffer a buffer of edits to be written.
      */
     synchronized RegionEntryBuffer getChunkToWrite() {
       long biggestSize = 0;
@@ -1127,8 +1053,8 @@ public class WALSplitter {
   }
 
   /**
-   * The following class is an abstraction class to provide a common interface to support both
-   * existing recovered edits file sink and region server WAL edits replay sink
+   * The following class is an abstraction class to provide a common interface to support
+   * different ways of consuming recovered edits.
    */
   public static abstract class OutputSink {
 
@@ -1195,10 +1121,6 @@ public class WALSplitter {
       }
     }
 
-    Long getRegionMaximumEditLogSeqNum(byte[] region) {
-      return regionMaximumEditLogSeqNum.get(region);
-    }
-
     /**
      * @return the number of currently opened writers
      */
@@ -1692,583 +1614,6 @@ public class WALSplitter {
     }
   }
 
-  /**
-   * Class that manages to replay edits from WAL files directly to assigned fail over region servers
-   */
-  class LogReplayOutputSink extends OutputSink {
-    private static final double BUFFER_THRESHOLD = 0.35;
-    private static final String KEY_DELIMITER = "#";
-
-    private final long waitRegionOnlineTimeOut;
-    private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
-    private final Map<String, RegionServerWriter> writers = new ConcurrentHashMap<>();
-    // online encoded region name -> region location map
-    private final Map<String, HRegionLocation> onlineRegions = new ConcurrentHashMap<>();
-
-    private final Map<TableName, ClusterConnection> tableNameToHConnectionMap = Collections
-        .synchronizedMap(new TreeMap<TableName, ClusterConnection>());
-    /**
-     * Map key -> value layout
-     * {@literal <servername>:<table name> -> Queue<Row>}
-     */
-    private final Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
-        new ConcurrentHashMap<>();
-    private final List<Throwable> thrown = new ArrayList<>();
-
-    // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
-    // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
-    // assigned and online before it can replay wal edits while regions of disabling/disabled table
-    // won't be assigned by AM. We can retire this code after HBASE-8234.
-    private final LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
-    private boolean hasEditsInDisablingOrDisabledTables = false;
-
-    public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
-        int numWriters) {
-      super(controller, entryBuffers, numWriters);
-      this.waitRegionOnlineTimeOut =
-          conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
-            ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
-      this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
-        entryBuffers, numWriters);
-      this.logRecoveredEditsOutputSink.setReporter(reporter);
-    }
-
-    @Override
-    public void append(RegionEntryBuffer buffer) throws IOException {
-      List<Entry> entries = buffer.entryBuffer;
-      if (entries.isEmpty()) {
-        LOG.warn("got an empty buffer, skipping");
-        return;
-      }
-
-      // check if current region in a disabling or disabled table
-      if (isTableDisabledOrDisabling(buffer.tableName)) {
-        // need fall back to old way
-        logRecoveredEditsOutputSink.append(buffer);
-        hasEditsInDisablingOrDisabledTables = true;
-        // store regions we have recovered so far
-        addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
-        return;
-      }
-
-      // group entries by region servers
-      groupEditsByServer(entries);
-
-      // process workitems
-      String maxLocKey = null;
-      int maxSize = 0;
-      List<Pair<HRegionLocation, Entry>> maxQueue = null;
-      synchronized (this.serverToBufferQueueMap) {
-        for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
-            this.serverToBufferQueueMap.entrySet()) {
-          List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue();
-          if (curQueue.size() > maxSize) {
-            maxSize = curQueue.size();
-            maxQueue = curQueue;
-            maxLocKey = entry.getKey();
-          }
-        }
-        if (maxSize < minBatchSize
-            && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
-          // buffer more to process
-          return;
-        } else if (maxSize > 0) {
-          this.serverToBufferQueueMap.remove(maxLocKey);
-        }
-      }
-
-      if (maxSize > 0) {
-        processWorkItems(maxLocKey, maxQueue);
-      }
-    }
-
-    private void addToRecoveredRegions(String encodedRegionName) {
-      if (!recoveredRegions.contains(encodedRegionName)) {
-        recoveredRegions.add(encodedRegionName);
-      }
-    }
-
-    /**
-     * Helper function to group WALEntries to individual region servers
-     * @throws IOException
-     */
-    private void groupEditsByServer(List<Entry> entries) throws IOException {
-      Set<TableName> nonExistentTables = null;
-      Long cachedLastFlushedSequenceId = -1l;
-      for (Entry entry : entries) {
-        WALEdit edit = entry.getEdit();
-        TableName table = entry.getKey().getTablename();
-        // clear scopes which isn't needed for recovery
-        entry.getKey().serializeReplicationScope(false);
-        String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
-        // skip edits of non-existent tables
-        if (nonExistentTables != null && nonExistentTables.contains(table)) {
-          this.skippedEdits.incrementAndGet();
-          continue;
-        }
-
-        Map<byte[], Long> maxStoreSequenceIds = null;
-        boolean needSkip = false;
-        HRegionLocation loc = null;
-        String locKey = null;
-        List<Cell> cells = edit.getCells();
-        List<Cell> skippedCells = new ArrayList<>();
-        ClusterConnection cconn = this.getConnectionByTableName(table);
-
-        for (Cell cell : cells) {
-          byte[] row = CellUtil.cloneRow(cell);
-          byte[] family = CellUtil.cloneFamily(cell);
-          boolean isCompactionEntry = false;
-          if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
-            CompactionDescriptor compaction = WALEdit.getCompaction(cell);
-            if (compaction != null && compaction.hasRegionName()) {
-              try {
-                byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
-                  .toByteArray());
-                row = regionName[1]; // startKey of the region
-                family = compaction.getFamilyName().toByteArray();
-                isCompactionEntry = true;
-              } catch (Exception ex) {
-                LOG.warn("Unexpected exception received, ignoring " + ex);
-                skippedCells.add(cell);
-                continue;
-              }
-            } else {
-              skippedCells.add(cell);
-              continue;
-            }
-          }
-
-          try {
-            loc =
-                locateRegionAndRefreshLastFlushedSequenceId(cconn, table, row,
-                  encodeRegionNameStr);
-            // skip replaying the compaction if the region is gone
-            if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
-              loc.getRegionInfo().getEncodedName())) {
-              LOG.info("Not replaying a compaction marker for an older region: "
-                  + encodeRegionNameStr);
-              needSkip = true;
-            }
-          } catch (TableNotFoundException ex) {
-            // table has been deleted so skip edits of the table
-            LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
-                + encodeRegionNameStr);
-            lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
-            if (nonExistentTables == null) {
-              nonExistentTables = new TreeSet<>();
-            }
-            nonExistentTables.add(table);
-            this.skippedEdits.incrementAndGet();
-            needSkip = true;
-            break;
-          }
-
-          cachedLastFlushedSequenceId =
-              lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
-          if (cachedLastFlushedSequenceId != null
-              && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
-            // skip the whole WAL entry
-            this.skippedEdits.incrementAndGet();
-            needSkip = true;
-            break;
-          } else {
-            if (maxStoreSequenceIds == null) {
-              maxStoreSequenceIds =
-                  regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
-            }
-            if (maxStoreSequenceIds != null) {
-              Long maxStoreSeqId = maxStoreSequenceIds.get(family);
-              if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getSequenceId()) {
-                // skip current kv if column family doesn't exist anymore or already flushed
-                skippedCells.add(cell);
-                continue;
-              }
-            }
-          }
-        }
-
-        // skip the edit
-        if (loc == null || needSkip) continue;
-
-        if (!skippedCells.isEmpty()) {
-          cells.removeAll(skippedCells);
-        }
-
-        synchronized (serverToBufferQueueMap) {
-          locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
-          List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
-          if (queue == null) {
-            queue =
-                Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
-            serverToBufferQueueMap.put(locKey, queue);
-          }
-          queue.add(new Pair<>(loc, entry));
-        }
-        // store regions we have recovered so far
-        addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
-      }
-    }
-
-    /**
-     * Locate destination region based on table name & row. This function also makes sure the
-     * destination region is online for replay.
-     * @throws IOException
-     */
-    private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(ClusterConnection cconn,
-        TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
-      // fetch location from cache
-      HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
-      if(loc != null) return loc;
-      // fetch location from hbase:meta directly without using cache to avoid hit old dead server
-      loc = cconn.getRegionLocation(table, row, true);
-      if (loc == null) {
-        throw new IOException("Can't locate location for row:" + Bytes.toString(row)
-            + " of table:" + table);
-      }
-      // check if current row moves to a different region due to region merge/split
-      if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
-        // originalEncodedRegionName should have already flushed
-        lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
-        HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
-        if (tmpLoc != null) return tmpLoc;
-      }
-
-      Long lastFlushedSequenceId = -1L;
-      AtomicBoolean isRecovering = new AtomicBoolean(true);
-      loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
-      if (!isRecovering.get()) {
-        // region isn't in recovering at all because WAL file may contain a region that has
-        // been moved to somewhere before hosting RS fails
-        lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
-        LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
-            + " because it's not in recovering.");
-      } else {
-        Long cachedLastFlushedSequenceId =
-            lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
-
-        // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
-        // update the value for the region
-        RegionStoreSequenceIds ids =
-            splitLogWorkerCoordination.getRegionFlushedSequenceId(failedServerName,
-              loc.getRegionInfo().getEncodedName());
-        if (ids != null) {
-          lastFlushedSequenceId = ids.getLastFlushedSequenceId();
-          Map<byte[], Long> storeIds = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-          List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
-          for (StoreSequenceId id : maxSeqIdInStores) {
-            storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
-          }
-          regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
-        }
-
-        if (cachedLastFlushedSequenceId == null
-            || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
-          lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
-        }
-      }
-
-      onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
-      return loc;
-    }
-
-    private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
-        throws IOException {
-      RegionServerWriter rsw = null;
-
-      long startTime = System.nanoTime();
-      try {
-        rsw = getRegionServerWriter(key);
-        rsw.sink.replayEntries(actions);
-
-        // Pass along summary statistics
-        rsw.incrementEdits(actions.size());
-        rsw.incrementNanoTime(System.nanoTime() - startTime);
-      } catch (IOException e) {
-        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
-        LOG.fatal(" Got while writing log entry to log", e);
-        throw e;
-      }
-    }
-
-    /**
-     * Wait until region is online on the destination region server
-     * @param loc
-     * @param row
-     * @param timeout How long to wait
-     * @param isRecovering Recovering state of the region interested on destination region server.
-     * @return True when region is online on the destination region server
-     * @throws InterruptedException
-     */
-    private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
-        final long timeout, AtomicBoolean isRecovering)
-        throws IOException {
-      final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
-      final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-      boolean reloadLocation = false;
-      TableName tableName = loc.getRegionInfo().getTable();
-      int tries = 0;
-      Throwable cause = null;
-      while (endTime > EnvironmentEdgeManager.currentTime()) {
-        try {
-          // Try and get regioninfo from the hosting server.
-          ClusterConnection cconn = getConnectionByTableName(tableName);
-          if(reloadLocation) {
-            loc = cconn.getRegionLocation(tableName, row, true);
-          }
-          BlockingInterface remoteSvr = cconn.getAdmin(loc.getServerName());
-          HRegionInfo region = loc.getRegionInfo();
-          try {
-            GetRegionInfoRequest request =
-                RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
-            GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
-            if (HRegionInfo.convert(response.getRegionInfo()) != null) {
-              isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
-              return loc;
-            }
-          } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) {
-            throw ProtobufUtil.handleRemoteException(e);
-          }
-        } catch (IOException e) {
-          cause = e.getCause();
-          if(!(cause instanceof RegionOpeningException)) {
-            reloadLocation = true;
-          }
-        }
-        long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
-        try {
-          Thread.sleep(expectedSleep);
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted when waiting region " +
-              loc.getRegionInfo().getEncodedName() + " online.", e);
-        }
-        tries++;
-      }
-
-      throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
-        " online for " + timeout + " milliseconds.", cause);
-    }
-
-    @Override
-    public boolean flush() throws IOException {
-      String curLoc = null;
-      int curSize = 0;
-      List<Pair<HRegionLocation, Entry>> curQueue = null;
-      synchronized (this.serverToBufferQueueMap) {
-        for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
-                this.serverToBufferQueueMap.entrySet()) {
-          curQueue = entry.getValue();
-          if (!curQueue.isEmpty()) {
-            curSize = curQueue.size();
-            curLoc = entry.getKey();
-            break;
-          }
-        }
-        if (curSize > 0) {
-          this.serverToBufferQueueMap.remove(curLoc);
-        }
-      }
-
-      if (curSize > 0) {
-        this.processWorkItems(curLoc, curQueue);
-        // We should already have control of the monitor; ensure this is the case.
-        synchronized(controller.dataAvailable) {
-          controller.dataAvailable.notifyAll();
-        }
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public boolean keepRegionEvent(Entry entry) {
-      return true;
-    }
-
-    void addWriterError(Throwable t) {
-      thrown.add(t);
-    }
-
-    @Override
-    public List<Path> finishWritingAndClose() throws IOException {
-      try {
-        if (!finishWriting(false)) {
-          return null;
-        }
-        if (hasEditsInDisablingOrDisabledTables) {
-          splits = logRecoveredEditsOutputSink.finishWritingAndClose();
-        } else {
-          splits = new ArrayList<>();
-        }
-        // returns an empty array in order to keep interface same as old way
-        return splits;
-      } finally {
-        List<IOException> thrown = closeRegionServerWriters();
-        if (thrown != null && !thrown.isEmpty()) {
-          throw MultipleIOException.createIOException(thrown);
-        }
-      }
-    }
-
-    @Override
-    int getNumOpenWriters() {
-      return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
-    }
-
-    private List<IOException> closeRegionServerWriters() throws IOException {
-      List<IOException> result = null;
-      if (!writersClosed) {
-        result = Lists.newArrayList();
-        try {
-          for (WriterThread t : writerThreads) {
-            while (t.isAlive()) {
-              t.shouldStop = true;
-              t.interrupt();
-              try {
-                t.join(10);
-              } catch (InterruptedException e) {
-                IOException iie = new InterruptedIOException();
-                iie.initCause(e);
-                throw iie;
-              }
-            }
-          }
-        } finally {
-          synchronized (writers) {
-            for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
-              RegionServerWriter tmpW = entry.getValue();
-              try {
-                tmpW.close();
-              } catch (IOException ioe) {
-                LOG.error("Couldn't close writer for region server:" + entry.getKey(), ioe);
-                result.add(ioe);
-              }
-            }
-          }
-
-          // close connections
-          synchronized (this.tableNameToHConnectionMap) {
-            for (Map.Entry<TableName, ClusterConnection> entry :
-                    this.tableNameToHConnectionMap.entrySet()) {
-              ClusterConnection cconn = entry.getValue();
-              try {
-                cconn.clearRegionCache();
-                cconn.close();
-              } catch (IOException ioe) {
-                result.add(ioe);
-              }
-            }
-          }
-          writersClosed = true;
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public Map<byte[], Long> getOutputCounts() {
-      TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      synchronized (writers) {
-        for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
-          ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
-        }
-      }
-      return ret;
-    }
-
-    @Override
-    public int getNumberOfRecoveredRegions() {
-      return this.recoveredRegions.size();
-    }
-
-    private boolean isTableDisabledOrDisabling(TableName tableName) {
-      if (connection == null)
-        return false; // we can't get state without CoordinatedStateManager
-      if (tableName.isSystemTable())
-        return false; // assume that system tables never can be disabled
-      TableState tableState = tableStatesCache.get(tableName);
-      if (tableState == null) {
-        try {
-          tableState = MetaTableAccessor.getTableState(connection, tableName);
-          if (tableState != null)
-            tableStatesCache.put(tableName, tableState);
-        } catch (IOException e) {
-          LOG.warn("State is not accessible for table " + tableName, e);
-        }
-      }
-      return tableState != null && tableState
-          .inStates(TableState.State.DISABLED, TableState.State.DISABLING);
-    }
-
-    /**
-     * Get a writer and path for a log starting at the given entry. This function is threadsafe so
-     * long as multiple threads are always acting on different regions.
-     * @return null if this region shouldn't output any logs
-     */
-    private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
-      RegionServerWriter ret = writers.get(loc);
-      if (ret != null) {
-        return ret;
-      }
-
-      TableName tableName = getTableFromLocationStr(loc);
-      if(tableName == null){
-        throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
-      }
-
-      ClusterConnection hconn = getConnectionByTableName(tableName);
-      synchronized (writers) {
-        ret = writers.get(loc);
-        if (ret == null) {
-          ret = new RegionServerWriter(conf, tableName, hconn);
-          writers.put(loc, ret);
-        }
-      }
-      return ret;
-    }
-
-    private ClusterConnection getConnectionByTableName(final TableName tableName) throws IOException {
-      ClusterConnection cconn = this.tableNameToHConnectionMap.get(tableName);
-      if (cconn == null) {
-        synchronized (this.tableNameToHConnectionMap) {
-          cconn = this.tableNameToHConnectionMap.get(tableName);
-          if (cconn == null) {
-            cconn = (ClusterConnection) ConnectionFactory.createConnection(conf);
-            this.tableNameToHConnectionMap.put(tableName, cconn);
-          }
-        }
-      }
-      return cconn;
-    }
-    private TableName getTableFromLocationStr(String loc) {
-      /**
-       * location key is in format {@literal <server name:port>#<table name>}
-       */
-      String[] splits = loc.split(KEY_DELIMITER);
-      if (splits.length != 2) {
-        return null;
-      }
-      return TableName.valueOf(splits[1]);
-    }
-  }
-
-  /**
-   * Private data structure that wraps a receiving RS and collecting statistics about the data
-   * written to this newly assigned RS.
-   */
-  private final static class RegionServerWriter extends SinkWriter {
-    final WALEditsReplaySink sink;
-
-    RegionServerWriter(final Configuration conf, final TableName tableName, final ClusterConnection conn)
-        throws IOException {
-      this.sink = new WALEditsReplaySink(conf, tableName, conn);
-    }
-
-    void close() throws IOException {
-    }
-  }
-
   static class CorruptedLogFileException extends Exception {
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
deleted file mode 100644
index 16485ee..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoveringRegionWatcher.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Watcher used to be notified of the recovering region coming out of recovering state
- */
-@InterfaceAudience.Private
-public class RecoveringRegionWatcher extends ZooKeeperListener {
-  private static final Log LOG = LogFactory.getLog(RecoveringRegionWatcher.class);
-
-  private HRegionServer server;
-
-  /**
-   * Construct a ZooKeeper event listener.
-   */
-  public RecoveringRegionWatcher(ZooKeeperWatcher watcher, HRegionServer server) {
-    super(watcher);
-    watcher.registerListener(this);
-    this.server = server;
-  }
-
-  /**
-   * Called when a node has been deleted
-   * @param path full path of the deleted node
-   */
-  @Override
-  public void nodeDeleted(String path) {
-    if (this.server.isStopped() || this.server.isStopping()) {
-      return;
-    }
-
-    String parentPath = path.substring(0, path.lastIndexOf('/'));
-    if (!this.watcher.znodePaths.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
-      return;
-    }
-
-    String regionName = path.substring(parentPath.length() + 1);
-
-    server.getExecutorService().submit(new FinishRegionRecoveringHandler(server, regionName, path));
-  }
-
-  @Override
-  public void nodeDataChanged(String path) {
-    registerWatcher(path);
-  }
-
-  @Override
-  public void nodeChildrenChanged(String path) {
-    registerWatcher(path);
-  }
-
-  /**
-   * Reinstall watcher because watcher only fire once though we're only interested in nodeDeleted
-   * event we need to register the watcher in case other event happens
-   */
-  private void registerWatcher(String path) {
-    String parentPath = path.substring(0, path.lastIndexOf('/'));
-    if (!this.watcher.znodePaths.recoveringRegionsZNode.equalsIgnoreCase(parentPath)) {
-      return;
-    }
-
-    try {
-      ZKUtil.getDataAndWatch(watcher, path);
-    } catch (KeeperException e) {
-      LOG.warn("Can't register watcher on znode " + path, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index 05cd8a2..30e988f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -18,11 +18,9 @@
 package org.apache.hadoop.hbase.zookeeper;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,12 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
-import org.apache.zookeeper.KeeperException;
 
 /**
- * Common methods and attributes used by {@link org.apache.hadoop.hbase.master.SplitLogManager} 
+ * Common methods and attributes used by {@link org.apache.hadoop.hbase.master.SplitLogManager}
  * and {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
  * running distributed splitting of WAL logs.
  */
@@ -104,20 +99,10 @@ public class ZKSplitLog {
     return true;
   }
 
-  public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
-    String dirname = path.substring(0, path.lastIndexOf('/'));
-    return dirname.equals(zkw.znodePaths.splitLogZNode);
-  }
-
   public static Path getSplitLogDir(Path rootdir, String tmpname) {
     return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
   }
 
-
-  public static String getSplitLogDirTmpComponent(final String worker, String file) {
-    return worker + "_" + ZKSplitLog.encode(file);
-  }
-
   public static void markCorrupted(Path rootdir, String logFileName,
       FileSystem fs) {
     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
@@ -136,101 +121,4 @@ public class ZKSplitLog {
     isCorrupt = fs.exists(file);
     return isCorrupt;
   }
-
-  /*
-   * Following methods come from SplitLogManager
-   */
-
-  /**
-   * check if /hbase/recovering-regions/&lt;current region encoded name&gt;
-   * exists. Returns true if exists and set watcher as well.
-   * @param zkw
-   * @param regionEncodedName region encode name
-   * @return true when /hbase/recovering-regions/&lt;current region encoded name&gt; exists
-   * @throws KeeperException
-   */
-  public static boolean
-      isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
-          throws KeeperException {
-    boolean result = false;
-    String nodePath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, regionEncodedName);
-
-    byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
-    if (node != null) {
-      result = true;
-    }
-    return result;
-  }
-
-  /**
-   * @param bytes - Content of a failed region server or recovering region znode.
-   * @return long - The last flushed sequence Id for the region server
-   */
-  public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
-    long lastRecordedFlushedSequenceId = -1l;
-    try {
-      lastRecordedFlushedSequenceId = ZKUtil.parseWALPositionFrom(bytes);
-    } catch (DeserializationException e) {
-      lastRecordedFlushedSequenceId = -1l;
-      LOG.warn("Can't parse last flushed sequence Id", e);
-    }
-    return lastRecordedFlushedSequenceId;
-  }
-
-  public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
-    try {
-      if (regions == null) {
-        // remove all children under /home/recovering-regions
-        LOG.debug("Garbage collecting all recovering region znodes");
-        ZKUtil.deleteChildrenRecursively(watcher, watcher.znodePaths.recoveringRegionsZNode);
-      } else {
-        for (String curRegion : regions) {
-          String nodePath = ZKUtil.joinZNode(watcher.znodePaths.recoveringRegionsZNode, curRegion);
-          ZKUtil.deleteNodeRecursively(watcher, nodePath);
-        }
-      }
-    } catch (KeeperException e) {
-      LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
-    }
-  }
-
-  /**
-   * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
-   * @param zkw
-   * @param serverName
-   * @param encodedRegionName
-   * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName</code>
-   * @throws IOException
-   */
-
-  public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
-      String serverName, String encodedRegionName) throws IOException {
-    // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
-    // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
-    // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
-    // sequence Id name space (sequence Id only valid for a particular RS instance), changes
-    // when different newly assigned RS flushes the region.
-    // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
-    // last flushed sequence Id for each failed RS instance.
-    RegionStoreSequenceIds result = null;
-    String nodePath = ZKUtil.joinZNode(zkw.znodePaths.recoveringRegionsZNode, encodedRegionName);
-    nodePath = ZKUtil.joinZNode(nodePath, serverName);
-    try {
-      byte[] data;
-      try {
-        data = ZKUtil.getData(zkw, nodePath);
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException();
-      }
-      if (data != null) {
-        result = ZKUtil.parseRegionStoreSequenceIds(data);
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
-          + serverName + "; region=" + encodedRegionName, e);
-    } catch (DeserializationException e) {
-      LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
-    }
-    return result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 58a0055..cac6fd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -265,12 +265,6 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
-  public Map<String, HRegion> getRecoveringRegions() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
   public ServerNonceManager getNonceManager() {
     // TODO Auto-generated method stub
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 2ef200f..643940c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -323,7 +323,7 @@ public class TestIOFencing {
         @Override
         public boolean evaluate() throws Exception {
           Region newRegion = newServer.getOnlineRegion(REGION_NAME);
-          return newRegion != null && !newRegion.isRecovering();
+          return newRegion != null;
         }
       });
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
index e3c1df3..f60c1e5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
@@ -36,20 +36,16 @@ import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -121,15 +117,6 @@ public class TestSerialization {
 
   }
 
-  @Test
-  public void testSplitLogTask() throws DeserializationException {
-    SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
-      RecoveryMode.LOG_REPLAY);
-    byte [] bytes = slt.toByteArray();
-    SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
-    assertTrue(slt.equals(sltDeserialized));
-  }
-
   @Test public void testCompareFilter() throws Exception {
     Filter f = new RowFilter(CompareOperator.EQUAL,
       new BinaryComparator(Bytes.toBytes("testRowOne-2")));

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 68770b9..a34b651 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -264,7 +264,7 @@ public class TestReplicasClient {
     } catch (Exception e){}
     // first version is '0'
     AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
-      getRS().getServerName(), hri, null, null);
+      getRS().getServerName(), hri, null);
     AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
     Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
     Assert.assertEquals(responseOpen.getOpeningState(0),

http://git-wip-us.apache.org/repos/asf/hbase/blob/4132314f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 119c225..964bb26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -557,11 +557,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   }
 
   @Override
-  public Map<String, HRegion> getRecoveringRegions() {
-    return null;
-  }
-
-  @Override
   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
       UpdateFavoredNodesRequest request) throws ServiceException {
     return null;