You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/30 13:11:25 UTC

[19/43] hbase git commit: HBASE-19973 Implement a procedure to replay sync replication wal for standby cluster

HBASE-19973 Implement a procedure to replay sync replication wal for standby cluster


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bd7affb5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bd7affb5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bd7affb5

Branch: refs/heads/HBASE-19064
Commit: bd7affb503225992b7dc8846e2392300760efbd9
Parents: 59282a0
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Mar 2 18:43:25 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed May 30 21:01:01 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/MasterProcedure.proto     |  22 +++
 .../apache/hadoop/hbase/executor/EventType.java |   9 +-
 .../hadoop/hbase/executor/ExecutorType.java     |   3 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |   9 +
 .../hadoop/hbase/master/MasterServices.java     |   6 +
 .../procedure/PeerProcedureInterface.java       |   3 +-
 .../hbase/master/procedure/PeerQueue.java       |   3 +-
 .../replication/RecoverStandbyProcedure.java    | 114 +++++++++++
 .../ReplaySyncReplicationWALManager.java        | 139 +++++++++++++
 .../ReplaySyncReplicationWALProcedure.java      | 193 +++++++++++++++++++
 .../hbase/regionserver/HRegionServer.java       |   9 +-
 .../ReplaySyncReplicationWALCallable.java       | 149 ++++++++++++++
 .../SyncReplicationPeerInfoProviderImpl.java    |   3 +
 .../org/apache/hadoop/hbase/util/FSUtils.java   |   5 +
 .../hbase/master/MockNoopMasterServices.java    |   8 +-
 .../master/TestRecoverStandbyProcedure.java     | 186 ++++++++++++++++++
 16 files changed, 854 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index e60881f..d58608a 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -468,3 +468,25 @@ message TransitPeerSyncReplicationStateStateData {
   optional SyncReplicationState fromState = 1;
   required SyncReplicationState toState = 2;
 }
+
+enum RecoverStandbyState {
+  RENAME_SYNC_REPLICATION_WALS_DIR = 1;
+  INIT_WORKERS = 2;
+  DISPATCH_TASKS = 3;
+  REMOVE_SYNC_REPLICATION_WALS_DIR = 4;
+}
+
+message RecoverStandbyStateData {
+  required string peer_id = 1;
+}
+
+message ReplaySyncReplicationWALStateData {
+  required string peer_id = 1;
+  required string wal = 2;
+  optional ServerName target_server = 3;
+}
+
+message ReplaySyncReplicationWALParameter {
+  required string peer_id = 1;
+  required string wal = 2;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 922deb8..ad38d1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -281,7 +281,14 @@ public enum EventType {
    *
    * RS_REFRESH_PEER
    */
-  RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
+  RS_REFRESH_PEER(84, ExecutorType.RS_REFRESH_PEER),
+
+  /**
+   * RS replay sync replication wal.<br>
+   *
+   * RS_REPLAY_SYNC_REPLICATION_WAL
+   */
+  RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL);
 
   private final int code;
   private final ExecutorType executor;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 7f130d1..ea97354 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -47,7 +47,8 @@ public enum ExecutorType {
   RS_REGION_REPLICA_FLUSH_OPS  (28),
   RS_COMPACTED_FILES_DISCHARGER (29),
   RS_OPEN_PRIORITY_REGION    (30),
-  RS_REFRESH_PEER               (31);
+  RS_REFRESH_PEER(31),
+  RS_REPLAY_SYNC_REPLICATION_WAL(32);
 
   ExecutorType(int value) {
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e7e585d..484f28f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
 import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
@@ -339,6 +340,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   // manager of replication
   private ReplicationPeerManager replicationPeerManager;
 
+  private ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
+
   // buffer for "fatal error" notices from region servers
   // in the cluster. This is only used for assisting
   // operations/debugging.
@@ -828,6 +831,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     initializeMemStoreChunkCreator();
     this.fileSystemManager = new MasterFileSystem(conf);
     this.walManager = new MasterWalManager(this);
+    this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this);
 
     // enable table descriptors cache
     this.tableDescriptors.setCacheOn();
@@ -3670,4 +3674,9 @@ public class HMaster extends HRegionServer implements MasterServices {
   public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
     return this.snapshotQuotaChore;
   }
+
+  @Override
+  public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
+    return this.replaySyncReplicationWALManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 76aa2d6..c5b9200 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
@@ -465,6 +466,11 @@ public interface MasterServices extends Server {
   ReplicationPeerManager getReplicationPeerManager();
 
   /**
+   * Returns the {@link ReplaySyncReplicationWALManager}.
+   */
+  ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager();
+
+  /**
    * Update the peerConfig for the specified peer
    * @param peerId a short name that identifies the peer
    * @param peerConfig new config for the peer

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
index fc5348e..8ea49a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
@@ -25,7 +25,8 @@ import org.apache.yetus.audience.InterfaceStability;
 public interface PeerProcedureInterface {
 
   enum PeerOperationType {
-    ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE
+    ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE,
+    RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL
   }
 
   String getPeerId();

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
index 1ae0c2f..25feb7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
@@ -49,6 +49,7 @@ class PeerQueue extends Queue<String> {
   }
 
   private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
-    return proc.getPeerOperationType() != PeerOperationType.REFRESH;
+    return proc.getPeerOperationType() != PeerOperationType.REFRESH
+        && proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
new file mode 100644
index 0000000..e9e3a97
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
+
+@InterfaceAudience.Private
+public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandbyState> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class);
+
+  public RecoverStandbyProcedure() {
+  }
+
+  public RecoverStandbyProcedure(String peerId) {
+    super(peerId);
+  }
+
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+    ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
+        env.getMasterServices().getReplaySyncReplicationWALManager();
+    switch (state) {
+      case RENAME_SYNC_REPLICATION_WALS_DIR:
+        try {
+          replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId);
+        } catch (IOException e) {
+          LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
+          setFailure("master-recover-standby", e);
+          return Flow.NO_MORE_STATE;
+        }
+        setNextState(RecoverStandbyState.INIT_WORKERS);
+        return Flow.HAS_MORE_STATE;
+      case INIT_WORKERS:
+        replaySyncReplicationWALManager.initPeerWorkers(peerId);
+        setNextState(RecoverStandbyState.DISPATCH_TASKS);
+        return Flow.HAS_MORE_STATE;
+      case DISPATCH_TASKS:
+        addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream()
+            .map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
+                replaySyncReplicationWALManager.removeWALRootPath(wal)))
+            .toArray(ReplaySyncReplicationWALProcedure[]::new));
+        setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR);
+        return Flow.HAS_MORE_STATE;
+      case REMOVE_SYNC_REPLICATION_WALS_DIR:
+        try {
+          replaySyncReplicationWALManager.removePeerReplayWALDir(peerId);
+        } catch (IOException e) {
+          LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
+          throw new ProcedureYieldException();
+        }
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException("unhandled state=" + state);
+    }
+  }
+
+  private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
+      throws ProcedureYieldException {
+    try {
+      return replaySyncReplicationWALManager.getReplayWALs(peerId);
+    } catch (IOException e) {
+      LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
+      throw new ProcedureYieldException();
+    }
+  }
+
+  @Override
+  protected RecoverStandbyState getState(int stateId) {
+    return RecoverStandbyState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(RecoverStandbyState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected RecoverStandbyState getInitialState() {
+    return RecoverStandbyState.RENAME_SYNC_REPLICATION_WALS_DIR;
+  }
+
+  @Override
+  public PeerOperationType getPeerOperationType() {
+    return PeerOperationType.RECOVER_STANDBY;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
new file mode 100644
index 0000000..72f5c37
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class ReplaySyncReplicationWALManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
+
+  private static final String REPLAY_SUFFIX = "-replay";
+
+  private final MasterServices services;
+
+  private final Configuration conf;
+
+  private final FileSystem fs;
+
+  private final Path walRootDir;
+
+  private final Path remoteWALDir;
+
+  private final Map<String, BlockingQueue<ServerName>> availServers = new HashMap<>();
+
+  public ReplaySyncReplicationWALManager(MasterServices services) {
+    this.services = services;
+    this.conf = services.getConfiguration();
+    this.fs = services.getMasterFileSystem().getWALFileSystem();
+    this.walRootDir = services.getMasterFileSystem().getWALRootDir();
+    this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
+  }
+
+  public Path getPeerRemoteWALDir(String peerId) {
+    return new Path(this.remoteWALDir, peerId);
+  }
+
+  private Path getPeerReplayWALDir(String peerId) {
+    return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
+  }
+
+  public void createPeerRemoteWALDir(String peerId) throws IOException {
+    Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
+    if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
+      throw new IOException("Unable to mkdir " + peerRemoteWALDir);
+    }
+  }
+
+  public void renamePeerRemoteWALDir(String peerId) throws IOException {
+    Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
+    Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX);
+    if (fs.exists(peerRemoteWALDir)) {
+      if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) {
+        throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to "
+            + peerReplayWALDir + " for peer id=" + peerId);
+      }
+      LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir,
+        peerId);
+    } else if (!fs.exists(peerReplayWALDir)) {
+      throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir "
+          + peerReplayWALDir + " not exist for peer id=" + peerId);
+    }
+  }
+
+  public List<Path> getReplayWALs(String peerId) throws IOException {
+    Path peerReplayWALDir = getPeerReplayWALDir(peerId);
+    List<Path> replayWals = new ArrayList<>();
+    RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false);
+    while (iterator.hasNext()) {
+      replayWals.add(iterator.next().getPath());
+    }
+    return replayWals;
+  }
+
+  public void removePeerReplayWALDir(String peerId) throws IOException {
+    Path peerReplayWALDir = getPeerReplayWALDir(peerId);
+    if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
+      throw new IOException(
+          "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
+    }
+  }
+
+  public void initPeerWorkers(String peerId) {
+    BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
+    services.getServerManager().getOnlineServers().keySet()
+        .forEach(server -> servers.offer(server));
+    availServers.put(peerId, servers);
+  }
+
+  public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return availServers.get(peerId).poll(timeout, unit);
+  }
+
+  public void addAvailServer(String peerId, ServerName server) {
+    availServers.get(peerId).offer(server);
+  }
+
+  public String removeWALRootPath(Path path) {
+    String pathStr = path.toString();
+    // remove the "/" too.
+    return pathStr.substring(walRootDir.toString().length() + 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
new file mode 100644
index 0000000..8d8a65a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData;
+
+@InterfaceAudience.Private
+public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedureEnv>
+    implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class);
+
+  private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000;
+
+  private String peerId;
+
+  private ServerName targetServer = null;
+
+  private String wal;
+
+  private boolean dispatched;
+
+  private ProcedureEvent<?> event;
+
+  private boolean succ;
+
+  public ReplaySyncReplicationWALProcedure() {
+  }
+
+  public ReplaySyncReplicationWALProcedure(String peerId, String wal) {
+    this.peerId = peerId;
+    this.wal = wal;
+  }
+
+  @Override
+  public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+    return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
+        ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build()
+            .toByteArray());
+  }
+
+  @Override
+  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
+    complete(env, exception);
+  }
+
+  @Override
+  public void remoteOperationCompleted(MasterProcedureEnv env) {
+    complete(env, null);
+  }
+
+  @Override
+  public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
+    complete(env, error);
+  }
+
+  private void complete(MasterProcedureEnv env, Throwable error) {
+    if (event == null) {
+      LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
+        getProcId());
+      return;
+    }
+    ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
+        env.getMasterServices().getReplaySyncReplicationWALManager();
+    if (error != null) {
+      LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", wal, targetServer,
+        peerId, error);
+      this.succ = false;
+    } else {
+      LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", wal, targetServer,
+        peerId);
+      this.succ = true;
+      replaySyncReplicationWALManager.addAvailServer(peerId, targetServer);
+    }
+    event.wake(env.getProcedureScheduler());
+    event = null;
+  }
+
+  @Override
+  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    if (dispatched) {
+      if (succ) {
+        return null;
+      }
+      // retry
+      dispatched = false;
+    }
+
+    // Try poll a available server
+    if (targetServer == null) {
+      targetServer = env.getMasterServices().getReplaySyncReplicationWALManager()
+          .getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, TimeUnit.MILLISECONDS);
+      if (targetServer == null) {
+        LOG.info("No available server to replay wal {} for peer id={}, retry", wal, peerId);
+        throw new ProcedureYieldException();
+      }
+    }
+
+    // Dispatch task to target server
+    if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
+      LOG.info(
+        "Can not add remote operation for replay wal {} on {} for peer id={}, "
+            + "this usually because the server is already dead, " + "retry",
+        wal, targetServer, peerId);
+      targetServer = null;
+      throw new ProcedureYieldException();
+    }
+    dispatched = true;
+    event = new ProcedureEvent<>(this);
+    event.suspendIfNotReady(this);
+    throw new ProcedureSuspendedException();
+  }
+
+  @Override
+  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    return false;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    ReplaySyncReplicationWALStateData.Builder builder =
+        ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal);
+    if (targetServer != null) {
+      builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    ReplaySyncReplicationWALStateData data =
+        serializer.deserialize(ReplaySyncReplicationWALStateData.class);
+    peerId = data.getPeerId();
+    wal = data.getWal();
+    if (data.hasTargetServer()) {
+      targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+    }
+  }
+
+  @Override
+  public String getPeerId() {
+    return peerId;
+  }
+
+  @Override
+  public PeerOperationType getPeerOperationType() {
+    return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 41969fd..3aadb9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1926,6 +1926,11 @@ public class HRegionServer extends HasThread implements
     this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
       conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
 
+    if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) {
+      this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
+        conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 2));
+    }
+
     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
     uncaughtExceptionHandler);
     this.cacheFlusher.start(uncaughtExceptionHandler);
@@ -2871,14 +2876,14 @@ public class HRegionServer extends HasThread implements
   /**
    * @return Return the walRootDir.
    */
-  protected Path getWALRootDir() {
+  public Path getWALRootDir() {
     return walRootDir;
   }
 
   /**
    * @return Return the walFs.
    */
-  protected FileSystem getWALFileSystem() {
+  public FileSystem getWALFileSystem() {
     return walFs;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
new file mode 100644
index 0000000..8dfe3a2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
+
+/**
+ * This callable executed at RS side to replay sync replication wal.
+ */
+@InterfaceAudience.Private
+public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class);
+
+  private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE =
+      "hbase.replay.sync.replication.wal.batch.size";
+
+  private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024;
+
+  private HRegionServer rs;
+
+  private FileSystem fs;
+
+  private Configuration conf;
+
+  private String peerId;
+
+  private String wal;
+
+  private Exception initError;
+
+  private long batchSize;
+
+  @Override
+  public Void call() throws Exception {
+    if (initError != null) {
+      throw initError;
+    }
+    LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
+    try (Reader reader = getReader()) {
+      List<Entry> entries = readWALEntries(reader);
+      while (!entries.isEmpty()) {
+        Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
+            .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
+        HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond());
+        rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst());
+        entries = readWALEntries(reader);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void init(byte[] parameter, HRegionServer rs) {
+    this.rs = rs;
+    this.fs = rs.getWALFileSystem();
+    this.conf = rs.getConfiguration();
+    try {
+      ReplaySyncReplicationWALParameter param =
+          ReplaySyncReplicationWALParameter.parseFrom(parameter);
+      this.peerId = param.getPeerId();
+      this.wal = param.getWal();
+      this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
+        DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
+    } catch (InvalidProtocolBufferException e) {
+      initError = e;
+    }
+  }
+
+  @Override
+  public EventType getEventType() {
+    return EventType.RS_REPLAY_SYNC_REPLICATION_WAL;
+  }
+
+  private Reader getReader() throws IOException {
+    Path path = new Path(rs.getWALRootDir(), wal);
+    long length = rs.getWALFileSystem().getFileStatus(path).getLen();
+    try {
+      FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf);
+      return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration());
+    } catch (EOFException e) {
+      if (length <= 0) {
+        LOG.warn("File is empty. Could not open {} for reading because {}", path, e);
+        return null;
+      }
+      throw e;
+    }
+  }
+
+  private List<Entry> readWALEntries(Reader reader) throws IOException {
+    List<Entry> entries = new ArrayList<>();
+    if (reader == null) {
+      return entries;
+    }
+    long size = 0;
+    Entry entry = reader.next();
+    while (entry != null) {
+      entries.add(entry);
+      size += entry.getEdit().heapSize();
+      if (size > batchSize) {
+        break;
+      }
+      entry = reader.next();
+    }
+    return entries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index 973e049..e4afc33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -41,6 +41,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
 
   @Override
   public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
+    if (info == null) {
+      return Optional.empty();
+    }
     String peerId = mapping.getPeerId(info);
     if (peerId == null) {
       return Optional.empty();

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 53db140..8a1f948 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -939,6 +939,11 @@ public abstract class FSUtils extends CommonFSUtils {
     }
   }
 
+  public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
+      throws IOException {
+    recoverFileLease(fs, p, conf, null);
+  }
+
   /**
    * Recover file lease. Used when a file might be suspect
    * to be had been left open by another process.

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index dce062c..60132a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.locking.LockManager;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
@@ -490,4 +491,9 @@ public class MockNoopMasterServices implements MasterServices {
     SyncReplicationState clusterState) throws ReplicationException, IOException {
     return 0;
   }
-}
+
+  @Override
+  public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd7affb5/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
new file mode 100644
index 0000000..f3d61bb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestRecoverStandbyProcedure {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRecoverStandbyProcedure.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class);
+
+  private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure");
+
+  private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
+
+  private static final byte[] family = Bytes.toBytes("CF");
+
+  private static final byte[] qualifier = Bytes.toBytes("q");
+
+  private static final long timestamp = System.currentTimeMillis();
+
+  private static final int ROW_COUNT = 1000;
+
+  private static final int WAL_NUMBER = 10;
+
+  private static final int RS_NUMBER = 3;
+
+  private static final String PEER_ID = "1";
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
+
+  private static ProcedureExecutor<MasterProcedureEnv> procExec;
+
+  private static FileSystem fs;
+
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
+    UTIL.startMiniCluster(RS_NUMBER);
+    UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+    conf = UTIL.getConfiguration();
+    HMaster master = UTIL.getHBaseCluster().getMaster();
+    fs = master.getMasterFileSystem().getWALFileSystem();
+    replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager();
+    procExec = master.getMasterProcedureExecutor();
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Before
+  public void setupBeforeTest() throws IOException {
+    UTIL.createTable(tableName, family);
+  }
+
+  @After
+  public void tearDownAfterTest() throws IOException {
+    try (Admin admin = UTIL.getAdmin()) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testRecoverStandby() throws IOException, StreamLacksCapabilityException {
+    setupSyncReplicationWALs();
+    long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID));
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) {
+        Result result = table.get(new Get(Bytes.toBytes(i)).setTimestamp(timestamp));
+        assertNotNull(result);
+        assertEquals(i, Bytes.toInt(result.getValue(family, qualifier)));
+      }
+    }
+  }
+
+  private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
+    Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID);
+    if (!fs.exists(peerRemoteWALDir)) {
+      fs.mkdirs(peerRemoteWALDir);
+    }
+    for (int i = 0; i < WAL_NUMBER; i++) {
+      try (ProtobufLogWriter writer = new ProtobufLogWriter()) {
+        Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep");
+        writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir));
+        List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT);
+        for (Entry entry : entries) {
+          writer.append(entry);
+        }
+        writer.sync(false);
+        LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID);
+      }
+    }
+  }
+
+  private List<Entry> setupWALEntries(int startRow, int endRow) {
+    return IntStream.range(startRow, endRow)
+        .mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i)))
+        .collect(Collectors.toList());
+  }
+
+  private Entry createWALEntry(byte[] row, byte[] value) {
+    WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1);
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(row, family, qualifier, timestamp, value));
+    return new Entry(key, edit);
+  }
+}