You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/18 13:39:05 UTC

[hbase] branch HBASE-27109/table_based_rqs updated: HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-27109/table_based_rqs by this push:
     new 1f14f4ae436 HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
1f14f4ae436 is described below

commit 1f14f4ae43632f5eaa81ff7e9af066f35ff97ed4
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Mar 18 21:38:53 2023 +0800

    HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
    
    Signed-off-by: Liangjun He <he...@apache.org>
---
 .../org/apache/hadoop/hbase/util/JsonMapper.java   |   4 +
 .../protobuf/server/master/MasterProcedure.proto   |   1 +
 .../hbase/replication/ReplicationQueueStorage.java |  21 ++
 .../replication/ReplicationStorageFactory.java     |  27 +-
 .../replication/TableReplicationQueueStorage.java  |  20 ++
 .../org/apache/hadoop/hbase/master/HMaster.java    |  41 +++
 .../AssignReplicationQueuesProcedure.java          |  48 ++-
 .../ClaimReplicationQueueRemoteProcedure.java      |  32 ++
 .../OfflineTableReplicationQueueStorage.java       | 382 +++++++++++++++++++++
 .../master/replication/ReplicationPeerManager.java |   2 +-
 .../regionserver/ReplicationSourceManager.java     | 188 +++++-----
 .../regionserver/ReplicationSyncUp.java            | 195 +++++++++--
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |   4 +
 .../hbase/master/cleaner/TestLogsCleaner.java      |   4 +-
 .../replication/TestReplicationSyncUpTool.java     | 185 +++++-----
 .../replication/TestReplicationSyncUpToolBase.java |   3 +-
 ...estReplicationSyncUpToolWithBulkLoadedData.java |  58 ++--
 .../TestTableReplicationQueueStorage.java          |  51 +++
 ...tReplicationSyncUpToolWithMultipleAsyncWAL.java |   3 -
 .../TestReplicationSyncUpToolWithMultipleWAL.java  |   3 -
 .../regionserver/TestReplicationSourceManager.java |  14 +-
 .../regionserver/TestSerialReplicationChecker.java |   4 +-
 22 files changed, 1025 insertions(+), 265 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java
index 0ff131f23bf..f2c4585a6a8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java
@@ -40,4 +40,8 @@ public final class JsonMapper {
   public static String writeObjectAsString(Object object) throws IOException {
     return GSON.toJson(object);
   }
+
+  public static <T> T fromJson(String json, Class<T> clazz) {
+    return GSON.fromJson(json, clazz);
+  }
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 14d07c17c88..901abf6bd0c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
 enum AssignReplicationQueuesState {
   ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
   ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
+  ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
 }
 
 message AssignReplicationQueuesStateData {
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 1e36bbeb78f..b5bc64eb55a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -203,4 +203,25 @@ public interface ReplicationQueueStorage {
    * Add the given hfile refs to the given peer.
    */
   void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
+
+  // the below method is for clean up stale data after running ReplicatoinSyncUp
+  /**
+   * Remove all the last sequence ids and hfile references data which are written before the given
+   * timestamp.
+   * <p/>
+   * The data of these two types are not used by replication directly.
+   * <p/>
+   * For last sequence ids, we will check it in serial replication, to make sure that we will
+   * replicate all edits in order, so if there are stale data, the worst case is that we will stop
+   * replicating as we think we still need to finish previous ranges first, although actually we
+   * have already replicated them out.
+   * <p/>
+   * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
+   * replicate them out, so if there are stale data, the worst case is that we can not remove these
+   * hfiles, although actually they have already been replicated out.
+   * <p/>
+   * So it is OK for us to just bring up the cluster first, and then use this method to delete the
+   * stale data, i.e, the data which are written before a specific timestamp.
+   */
+  void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
 }
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index dc4317feaa4..4d5fcb45634 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -27,8 +28,11 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Used to create replication storage(peer, queue) classes.
@@ -36,11 +40,15 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public final class ReplicationStorageFactory {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);
+
   public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name";
 
   public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
     TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
 
+  public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.impl";
+
   public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
     throws IOException {
     return TableDescriptorBuilder.newBuilder(tableName)
@@ -72,15 +80,26 @@ public final class ReplicationStorageFactory {
    */
   public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
     Configuration conf) {
-    return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
-      REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
+    return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
+      .get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
   }
 
   /**
    * Create a new {@link ReplicationQueueStorage}.
    */
   public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
-    TableName tableName) {
-    return new TableReplicationQueueStorage(conn, tableName);
+    Configuration conf, TableName tableName) {
+    Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
+      TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
+    try {
+      Constructor<? extends ReplicationQueueStorage> c =
+        clazz.getConstructor(Connection.class, TableName.class);
+      return c.newInstance(conn, tableName);
+    } catch (Exception e) {
+      LOG.debug(
+        "failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
+        e);
+      return ReflectionUtils.newInstance(clazz, conf, tableName);
+    }
   }
 }
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
index f3870f4d09d..e59edd52f79 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
@@ -594,4 +594,24 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
       throw new ReplicationException("failed to batch update hfile references", e);
     }
   }
+
+  @Override
+  public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
+    try (Table table = conn.getTable(tableName);
+      ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY)
+        .addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) {
+      for (;;) {
+        Result r = scanner.next();
+        if (r == null) {
+          break;
+        }
+        Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts)
+          .addFamily(HFILE_REF_FAMILY, ts);
+        table.delete(delete);
+      }
+    } catch (IOException e) {
+      throw new ReplicationException(
+        "failed to remove last sequence ids and hfile references before timestamp " + ts, e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 67d0f889d64..f2086393e63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -34,6 +34,9 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -59,6 +62,7 @@ import java.util.stream.Collectors;
 import javax.servlet.http.HttpServlet;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CatalogFamilyFormat;
@@ -226,6 +230,8 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
 import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
 import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
@@ -246,6 +252,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.JsonMapper;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.RetryCounter;
@@ -267,7 +274,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
 import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
@@ -1278,6 +1287,38 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
     status.setStatus("Initializing MOB Cleaner");
     initMobCleaner();
 
+    // delete the stale data for replication sync up tool if necessary
+    status.setStatus("Cleanup ReplicationSyncUp status if necessary");
+    Path replicationSyncUpInfoFile =
+      new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
+    if (dataFs.exists(replicationSyncUpInfoFile)) {
+      // info file is available, load the timestamp and use it to clean up stale data in replication
+      // queue storage.
+      byte[] data;
+      try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
+        data = ByteStreams.toByteArray(in);
+      }
+      ReplicationSyncUpToolInfo info = null;
+      try {
+        info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
+      } catch (JsonParseException e) {
+        // usually this should be a partial file, which means the ReplicationSyncUp tool did not
+        // finish properly, so not a problem. Here we do not clean up the status as we do not know
+        // the reason why the tool did not finish properly, so let users clean the status up
+        // manually
+        LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
+      }
+      if (info != null) {
+        LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
+          info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
+            .format(Instant.ofEpochMilli(info.getStartTimeMs())));
+        replicationPeerManager.getQueueStorage()
+          .removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
+        // delete the file after removing the stale data, so next time we do not need to do this
+        // again.
+        dataFs.delete(replicationSyncUpInfoFile, false);
+      }
+    }
     status.setStatus("Calling postStartMaster coprocessors");
     if (this.cpHost != null) {
       // don't let cp initialization errors kill the master
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java
index d33259dd436..b547c87009d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java
@@ -24,7 +24,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -102,7 +105,7 @@ public class AssignReplicationQueuesProcedure
     }
   }
 
-  private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
+  private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
     Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
       .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
     ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
@@ -130,18 +133,51 @@ public class AssignReplicationQueuesProcedure
     return Flow.HAS_MORE_STATE;
   }
 
+  // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
+  // claiming the replication queues and deleting them instead.
+  private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
+    MasterFileSystem mfs = env.getMasterFileSystem();
+    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
+    return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
+  }
+
+  private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
+    ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
+    for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
+      storage.removeQueue(queueId);
+    }
+    MasterFileSystem mfs = env.getMasterFileSystem();
+    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
+    // remove the region server record file
+    mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
     throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
     try {
       switch (state) {
         case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
-          addMissingQueues(env);
-          retryCounter = null;
-          setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
-          return Flow.HAS_MORE_STATE;
+          if (shouldSkip(env)) {
+            setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
+            return Flow.HAS_MORE_STATE;
+          } else {
+            addMissingQueues(env);
+            retryCounter = null;
+            setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
+            return Flow.HAS_MORE_STATE;
+          }
         case ASSIGN_REPLICATION_QUEUES_CLAIM:
-          return claimQueues(env);
+          if (shouldSkip(env)) {
+            retryCounter = null;
+            setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
+            return Flow.HAS_MORE_STATE;
+          } else {
+            return claimQueues(env);
+          }
+        case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
+          removeQueues(env);
+          return Flow.NO_MORE_STATE;
         default:
           throw new UnsupportedOperationException("unhandled state=" + state);
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
index 7b637384398..d3aeeba541a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
@@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
 import java.util.Optional;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
 import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
 import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
 import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +60,32 @@ public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
     this.targetServer = targetServer;
   }
 
+  // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
+  // claiming the replication queues and deleting them instead.
+  private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
+    MasterFileSystem mfs = env.getMasterFileSystem();
+    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
+    return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
+  }
+
+  @Override
+  protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+    try {
+      if (shouldSkip(env)) {
+        LOG.info("Skip claiming {} because replication sync up has already done it for us",
+          getServerName());
+        return null;
+      }
+    } catch (IOException e) {
+      LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
+        getServerName(), e);
+      // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
+      return null;
+    }
+    return super.execute(env);
+  }
+
   @Override
   public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
     assert targetServer.equals(remote);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
new file mode 100644
index 00000000000..9faca74f710
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@InterfaceAudience.Private
+public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage {
+
+  private final Map<ReplicationQueueId, Map<String, ReplicationGroupOffset>> offsets =
+    new HashMap<>();
+
+  private final Map<String, Map<String, Long>> lastSequenceIds = new HashMap<>();
+
+  private final Map<String, Set<String>> hfileRefs = new HashMap<>();
+
+  private void loadRegionInfo(FileSystem fs, Path regionDir,
+    NavigableMap<byte[], RegionInfo> startKey2RegionInfo) throws IOException {
+    RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+    // TODO: we consider that the there will not be too many regions for hbase:replication table, so
+    // here we just iterate over all the regions to find out the overlapped ones. Can be optimized
+    // later.
+    Iterator<Map.Entry<byte[], RegionInfo>> iter = startKey2RegionInfo.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<byte[], RegionInfo> entry = iter.next();
+      if (hri.isOverlap(entry.getValue())) {
+        if (hri.getRegionId() > entry.getValue().getRegionId()) {
+          // we are newer, remove the old hri, we can not break here as if hri is a merged region,
+          // we need to remove all its parent regions.
+          iter.remove();
+        } else {
+          // we are older, just return, skip the below add
+          return;
+        }
+      }
+
+    }
+    startKey2RegionInfo.put(hri.getStartKey(), hri);
+  }
+
+  private void loadOffsets(Result result) {
+    NavigableMap<byte[], byte[]> map =
+      result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY);
+    if (map == null || map.isEmpty()) {
+      return;
+    }
+    Map<String, ReplicationGroupOffset> offsetMap = new HashMap<>();
+    map.forEach((k, v) -> {
+      String walGroup = Bytes.toString(k);
+      ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
+      offsetMap.put(walGroup, offset);
+    });
+    ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
+    offsets.put(queueId, offsetMap);
+  }
+
+  private void loadLastSequenceIds(Result result) {
+    NavigableMap<byte[], byte[]> map =
+      result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY);
+    if (map == null || map.isEmpty()) {
+      return;
+    }
+    Map<String, Long> lastSeqIdMap = new HashMap<>();
+    map.forEach((k, v) -> {
+      String encodedRegionName = Bytes.toString(k);
+      long lastSeqId = Bytes.toLong(v);
+      lastSeqIdMap.put(encodedRegionName, lastSeqId);
+    });
+    String peerId = Bytes.toString(result.getRow());
+    lastSequenceIds.put(peerId, lastSeqIdMap);
+  }
+
+  private void loadHFileRefs(Result result) {
+    NavigableMap<byte[], byte[]> map =
+      result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY);
+    if (map == null || map.isEmpty()) {
+      return;
+    }
+    Set<String> refs = new HashSet<>();
+    map.keySet().forEach(ref -> refs.add(Bytes.toString(ref)));
+    String peerId = Bytes.toString(result.getRow());
+    hfileRefs.put(peerId, refs);
+  }
+
+  private void loadReplicationQueueData(Configuration conf, TableName tableName)
+    throws IOException {
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
+    FileSystem fs = tableDir.getFileSystem(conf);
+    FileStatus[] regionDirs =
+      CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
+    if (regionDirs == null) {
+      return;
+    }
+    NavigableMap<byte[], RegionInfo> startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (FileStatus regionDir : regionDirs) {
+      loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo);
+    }
+    TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+    for (RegionInfo hri : startKey2RegionInfo.values()) {
+      try (ClientSideRegionScanner scanner =
+        new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) {
+        for (;;) {
+          Result result = scanner.next();
+          if (result == null) {
+            break;
+          }
+          loadOffsets(result);
+          loadLastSequenceIds(result);
+          loadHFileRefs(result);
+        }
+      }
+    }
+  }
+
+  public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName)
+    throws IOException {
+    loadReplicationQueueData(conf, tableName);
+  }
+
+  @Override
+  public synchronized void setOffset(ReplicationQueueId queueId, String walGroup,
+    ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException {
+    Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
+    if (offsetMap == null) {
+      offsetMap = new HashMap<>();
+      offsets.put(queueId, offsetMap);
+    }
+    offsetMap.put(walGroup, offset);
+    Map<String, Long> lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId());
+    if (lastSeqIdsMap == null) {
+      lastSeqIdsMap = new HashMap<>();
+      lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap);
+    }
+    for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
+      Long oldSeqId = lastSeqIdsMap.get(entry.getKey());
+      if (oldSeqId == null || oldSeqId < entry.getValue()) {
+        lastSeqIdsMap.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  @Override
+  public synchronized Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
+    throws ReplicationException {
+    Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
+    if (offsetMap == null) {
+      return Collections.emptyMap();
+    }
+    return ImmutableMap.copyOf(offsetMap);
+  }
+
+  @Override
+  public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
+    throws ReplicationException {
+    return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId))
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized List<ReplicationQueueId> listAllQueueIds(ServerName serverName)
+    throws ReplicationException {
+    return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName))
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
+    throws ReplicationException {
+    return offsets.keySet().stream()
+      .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName))
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized List<ReplicationQueueData> listAllQueues() throws ReplicationException {
+    return offsets.entrySet().stream()
+      .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue())))
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized List<ServerName> listAllReplicators() throws ReplicationException {
+    return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct()
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
+    ServerName targetServerName) throws ReplicationException {
+    Map<String, ReplicationGroupOffset> offsetMap = offsets.remove(queueId);
+    if (offsetMap == null) {
+      return Collections.emptyMap();
+    }
+    offsets.put(queueId.claim(targetServerName), offsetMap);
+    return ImmutableMap.copyOf(offsetMap);
+  }
+
+  @Override
+  public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
+    offsets.remove(queueId);
+  }
+
+  @Override
+  public synchronized void removeAllQueues(String peerId) throws ReplicationException {
+    Iterator<ReplicationQueueId> iter = offsets.keySet().iterator();
+    while (iter.hasNext()) {
+      if (iter.next().getPeerId().equals(peerId)) {
+        iter.remove();
+      }
+    }
+  }
+
+  @Override
+  public synchronized long getLastSequenceId(String encodedRegionName, String peerId)
+    throws ReplicationException {
+    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
+    if (lastSeqIdMap == null) {
+      return HConstants.NO_SEQNUM;
+    }
+    Long lastSeqId = lastSeqIdMap.get(encodedRegionName);
+    return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM;
+  }
+
+  @Override
+  public synchronized void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
+    throws ReplicationException {
+    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
+    if (lastSeqIdMap == null) {
+      lastSeqIdMap = new HashMap<>();
+      lastSequenceIds.put(peerId, lastSeqIdMap);
+    }
+    lastSeqIdMap.putAll(lastSeqIds);
+  }
+
+  @Override
+  public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException {
+    lastSequenceIds.remove(peerId);
+  }
+
+  @Override
+  public synchronized void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+    throws ReplicationException {
+    Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
+    if (lastSeqIdMap == null) {
+      return;
+    }
+    for (String encodedRegionName : encodedRegionNames) {
+      lastSeqIdMap.remove(encodedRegionName);
+    }
+  }
+
+  @Override
+  public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException {
+    hfileRefs.remove(peerId);
+  }
+
+  @Override
+  public synchronized void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+    throws ReplicationException {
+    Set<String> refs = hfileRefs.get(peerId);
+    if (refs == null) {
+      refs = new HashSet<>();
+      hfileRefs.put(peerId, refs);
+    }
+    for (Pair<Path, Path> pair : pairs) {
+      refs.add(pair.getSecond().getName());
+    }
+  }
+
+  @Override
+  public synchronized void removeHFileRefs(String peerId, List<String> files)
+    throws ReplicationException {
+    Set<String> refs = hfileRefs.get(peerId);
+    if (refs == null) {
+      return;
+    }
+    refs.removeAll(files);
+  }
+
+  @Override
+  public synchronized List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
+    return ImmutableList.copyOf(hfileRefs.keySet());
+  }
+
+  @Override
+  public synchronized List<String> getReplicableHFiles(String peerId) throws ReplicationException {
+    Set<String> refs = hfileRefs.get(peerId);
+    if (refs == null) {
+      return Collections.emptyList();
+    }
+    return ImmutableList.copyOf(refs);
+  }
+
+  @Override
+  public synchronized Set<String> getAllHFileRefs() throws ReplicationException {
+    return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+  }
+
+  @Override
+  public boolean hasData() throws ReplicationException {
+    return true;
+  }
+
+  @Override
+  public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+    throws ReplicationException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void batchUpdateLastSequenceIds(
+    List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds)
+    throws ReplicationException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
+    throws ReplicationException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index d8c1b5c64c5..bb170be64af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -641,7 +641,7 @@ public class ReplicationPeerManager {
       };
     }
     return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage(
-      services.getConnection(), replicationQueueTableName), initializer);
+      services.getConnection(), conf, replicationQueueTableName), initializer);
   }
 
   public static ReplicationPeerManager create(MasterServices services, String clusterId)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index f3d07315240..f34f0d194e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -25,7 +25,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -125,6 +124,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  */
 @InterfaceAudience.Private
 public class ReplicationSourceManager {
+
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
   // all the sources that read this RS's logs and every peer only has one replication source
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -146,13 +146,15 @@ public class ReplicationSourceManager {
 
   // All logs we are currently tracking
   // Index structure of the map is: queue_id->logPrefix/logGroup->logs
-  // For normal replication source, the peer id is same with the queue id
   private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById;
   // Logs for recovered sources we are currently tracking
   // the map is: queue_id->logPrefix/logGroup->logs
-  // For recovered source, the queue id's format is peer_id-servername-*
+  // for recovered source, the WAL files should already been moved to oldLogDir, and we have
+  // different layout of old WAL files, for example, with server name sub directories or not, so
+  // here we record the full path instead of just the name, so when refreshing we can enqueue the
+  // WAL file again, without trying to guess the real path of the WAL files.
   private final ConcurrentMap<ReplicationQueueId,
-    Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
+    Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues;
 
   private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
 
@@ -514,9 +516,9 @@ public class ReplicationSourceManager {
         ReplicationSourceInterface recoveredReplicationSource =
           createRefreshedSource(oldSourceQueueId, peer);
         this.oldsources.add(recoveredReplicationSource);
-        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
+        for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
           .values()) {
-          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
+          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal));
         }
         toStartup.add(recoveredReplicationSource);
       }
@@ -656,9 +658,11 @@ public class ReplicationSourceManager {
   void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
     if (source.isRecovered()) {
-      NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
+      NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
       if (wals != null) {
-        NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
+        // here we just want to compare the timestamp, so it is OK to just create a fake WAL path
+        NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive)
+          .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new));
         if (walsToRemove.isEmpty()) {
           return;
         }
@@ -814,6 +818,93 @@ public class ReplicationSourceManager {
   }
 
   void claimQueue(ReplicationQueueId queueId) {
+    claimQueue(queueId, false);
+  }
+
+  // sorted from oldest to newest
+  private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp,
+    Map<String, ReplicationGroupOffset> offsets) throws IOException {
+    List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
+      URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
+    if (syncUp) {
+      // we also need to list WALs directory for ReplicationSyncUp
+      walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS));
+    }
+    PriorityQueue<Path> walFilesPQ =
+      new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
+    // sort the wal files and also filter out replicated files
+    for (Path file : walFiles) {
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
+      ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
+      if (shouldReplicate(groupOffset, file.getName())) {
+        walFilesPQ.add(file);
+      } else {
+        LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
+          groupOffset);
+      }
+    }
+    return walFilesPQ;
+  }
+
+  private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer,
+    ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) {
+    ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId());
+    if (peer == null || peer != oldPeer) {
+      src.terminate("Recovered queue doesn't belong to any current peer");
+      deleteQueue(claimedQueueId);
+      return;
+    }
+    // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
+    // transiting to STANDBY state. The only exception is we are in STANDBY state and
+    // transiting to DA, under this state we will replay the remote WAL and they need to be
+    // replicated back.
+    if (peer.getPeerConfig().isSyncReplication()) {
+      Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+        peer.getSyncReplicationStateAndNewState();
+      if (
+        (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
+          && stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
+          || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
+      ) {
+        src.terminate("Sync replication peer is in STANDBY state");
+        deleteQueue(claimedQueueId);
+        return;
+      }
+    }
+    // track sources in walsByIdRecoveredQueues
+    Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>();
+    walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
+    for (Path wal : walFiles) {
+      String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
+      NavigableSet<Path> wals = walsByGroup.get(walPrefix);
+      if (wals == null) {
+        wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
+        walsByGroup.put(walPrefix, wals);
+      }
+      wals.add(wal);
+    }
+    oldsources.add(src);
+    LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId,
+      walFiles.size());
+    for (Path wal : walFiles) {
+      LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
+      src.enqueueLog(wal);
+    }
+    src.startup();
+  }
+
+  /**
+   * Claim a replication queue.
+   * <p/>
+   * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue
+   * operation, we are the last step of a SCP, so we can assume that all the WAL files are under
+   * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a
+   * region server which has not been processed by SCP yet, so we still need to look at its WALs
+   * directory.
+   * @param queueId the replication queue id we want to claim
+   * @param syncUp  whether we are called by ReplicationSyncUp
+   */
+  void claimQueue(ReplicationQueueId queueId, boolean syncUp) {
     // Wait a bit before transferring the queues, we may be shutting down.
     // This sleep may not be enough in some cases.
     try {
@@ -872,76 +963,17 @@ public class ReplicationSourceManager {
       server.abort("Failed to create replication source after claiming queue.", e);
       return;
     }
-    List<Path> walFiles;
+    PriorityQueue<Path> walFiles;
     try {
-      walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
-        URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
+      walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets);
     } catch (IOException e) {
-      LOG.error("Can not list all wal files for peer {} and queue {}", peerId, queueId, e);
-      server.abort("Can not list all wal files after claiming queue.", e);
+      LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e);
+      server.abort("Can not list wal files after claiming queue.", e);
       return;
     }
-    PriorityQueue<Path> walFilesPQ = new PriorityQueue<>(
-      Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName()))
-        .thenComparing(Path::getName));
-    // sort the wal files and also filter out replicated files
-    for (Path file : walFiles) {
-      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
-      ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
-      if (shouldReplicate(groupOffset, file.getName())) {
-        walFilesPQ.add(file);
-      } else {
-        LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
-          groupOffset);
-      }
-    }
-    // the method is a bit long, so assign it to null here to avoid later we reuse it again by
-    // mistake, we should use the sorted walFilesPQ instead
-    walFiles = null;
     // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
     synchronized (oldsources) {
-      peer = replicationPeers.getPeer(src.getPeerId());
-      if (peer == null || peer != oldPeer) {
-        src.terminate("Recovered queue doesn't belong to any current peer");
-        deleteQueue(claimedQueueId);
-        return;
-      }
-      // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
-      // transiting to STANDBY state. The only exception is we are in STANDBY state and
-      // transiting to DA, under this state we will replay the remote WAL and they need to be
-      // replicated back.
-      if (peer.getPeerConfig().isSyncReplication()) {
-        Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
-          peer.getSyncReplicationStateAndNewState();
-        if (
-          (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
-            && stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
-            || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
-        ) {
-          src.terminate("Sync replication peer is in STANDBY state");
-          deleteQueue(claimedQueueId);
-          return;
-        }
-      }
-      // track sources in walsByIdRecoveredQueues
-      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-      walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
-      for (Path wal : walFilesPQ) {
-        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
-        NavigableSet<String> wals = walsByGroup.get(walPrefix);
-        if (wals == null) {
-          wals = new TreeSet<>();
-          walsByGroup.put(walPrefix, wals);
-        }
-        wals.add(wal.getName());
-      }
-      oldsources.add(src);
-      LOG.info("Added source for recovered queue {}", claimedQueueId);
-      for (Path wal : walFilesPQ) {
-        LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
-        src.enqueueLog(new Path(oldLogDir, wal));
-      }
-      src.startup();
+      addRecoveredSource(src, oldPeer, claimedQueueId, walFiles);
     }
   }
 
@@ -970,16 +1002,6 @@ public class ReplicationSourceManager {
     return Collections.unmodifiableMap(walsById);
   }
 
-  /**
-   * Get a copy of the wals of the recovered sources on this rs
-   * @return a sorted set of wal names
-   */
-  @RestrictedApi(explanation = "Should only be called in tests", link = "",
-      allowedOnPath = ".*/src/test/.*")
-  Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
-    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
-  }
-
   /**
    * Get a list of all the normal sources of this rs
    * @return list of all normal sources
@@ -1099,8 +1121,6 @@ public class ReplicationSourceManager {
     return this.globalMetrics;
   }
 
-  @RestrictedApi(explanation = "Should only be called in tests", link = "",
-      allowedOnPath = ".*/src/test/.*")
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index b63ad473719..f071cf6f1f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
@@ -35,11 +39,18 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.JsonMapper;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -59,6 +70,31 @@ import org.apache.zookeeper.KeeperException;
 @InterfaceAudience.Private
 public class ReplicationSyncUp extends Configured implements Tool {
 
+  public static class ReplicationSyncUpToolInfo {
+
+    private long startTimeMs;
+
+    public ReplicationSyncUpToolInfo() {
+    }
+
+    public ReplicationSyncUpToolInfo(long startTimeMs) {
+      this.startTimeMs = startTimeMs;
+    }
+
+    public long getStartTimeMs() {
+      return startTimeMs;
+    }
+
+    public void setStartTimeMs(long startTimeMs) {
+      this.startTimeMs = startTimeMs;
+    }
+  }
+
+  // For storing the information used to skip replicating some wals after the cluster is back online
+  public static final String INFO_DIR = "ReplicationSyncUp";
+
+  public static final String INFO_FILE = "info";
+
   private static final long SLEEP_TIME = 10000;
 
   /**
@@ -69,41 +105,116 @@ public class ReplicationSyncUp extends Configured implements Tool {
     System.exit(ret);
   }
 
-  private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws KeeperException {
-    List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
-    return rsZNodes == null
-      ? Collections.emptySet()
-      : rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+  // Find region servers under wal directory
+  // Here we only care about the region servers which may still be alive, as we need to add
+  // replications for them if missing. The dead region servers which have already been processed
+  // fully do not need to add their replication queues again, as the operation has already been done
+  // in SCP.
+  private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException {
+    FileStatus[] statuses;
+    try {
+      statuses = walFs.listStatus(walDir);
+    } catch (FileNotFoundException e) {
+      System.out.println("WAL directory " + walDir + " does not exists, ignore");
+      return Collections.emptySet();
+    }
+    Set<ServerName> regionServers = new HashSet<>();
+    for (FileStatus status : statuses) {
+      // All wal files under the walDir is within its region server's directory
+      if (!status.isDirectory()) {
+        continue;
+      }
+      ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath());
+      if (sn != null) {
+        regionServers.add(sn);
+      }
+    }
+    return regionServers;
+  }
+
+  private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer,
+    Set<String> peerIds) throws ReplicationException {
+    Set<String> existingQueuePeerIds = new HashSet<>();
+    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer);
+    for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) {
+      ReplicationQueueId queueId = iter.next();
+      if (!queueId.isRecovered()) {
+        existingQueuePeerIds.add(queueId.getPeerId());
+      }
+    }
+
+    for (String peerId : peerIds) {
+      if (!existingQueuePeerIds.contains(peerId)) {
+        ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId);
+        System.out.println("Add replication queue " + queueId + " for claiming");
+        storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN,
+          Collections.emptyMap());
+      }
+    }
+  }
+
+  private void addMissingReplicationQueues(ReplicationQueueStorage storage,
+    Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException {
+    for (ServerName regionServer : regionServers) {
+      addMissingReplicationQueues(storage, regionServer, peerIds);
+    }
   }
 
   // When using this tool, usually the source cluster is unhealthy, so we should try to claim the
   // replication queues for the dead region servers first and then replicate the data out.
-  private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr)
-    throws ReplicationException, KeeperException {
-    // TODO: reimplement this tool
-    // List<ServerName> replicators = mgr.getQueueStorage().getListOfReplicators();
-    // Set<ServerName> liveRegionServers = getLiveRegionServers(zkw);
-    // for (ServerName sn : replicators) {
-    // if (!liveRegionServers.contains(sn)) {
-    // List<String> replicationQueues = mgr.getQueueStorage().getAllQueues(sn);
-    // System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
-    // for (String queue : replicationQueues) {
-    // mgr.claimQueue(sn, queue);
-    // }
-    // }
-    // }
+  private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers)
+    throws ReplicationException, KeeperException, IOException {
+    // union the region servers from both places, i.e, from the wal directory, and the records in
+    // replication queue storage.
+    Set<ServerName> replicators = new HashSet<>(regionServers);
+    ReplicationQueueStorage queueStorage = mgr.getQueueStorage();
+    replicators.addAll(queueStorage.listAllReplicators());
+    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
+    Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
+    for (ServerName sn : replicators) {
+      List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn);
+      System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
+      // record the rs name, so when master restarting, we will skip claiming its replication queue
+      fs.createNewFile(new Path(infoDir, sn.getServerName()));
+      for (ReplicationQueueId queueId : replicationQueues) {
+        mgr.claimQueue(queueId, true);
+      }
+    }
+  }
+
+  private void writeInfoFile(FileSystem fs) throws IOException {
+    // Record the info of this run. Currently only record the time we run the job. We will use this
+    // timestamp to clean up the data for last sequence ids and hfile refs in replication queue
+    // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
+    ReplicationSyncUpToolInfo info =
+      new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
+    String json = JsonMapper.writeObjectAsString(info);
+    Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
+    try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
+      out.write(Bytes.toBytes(json));
+    }
   }
 
   @Override
   public int run(String[] args) throws Exception {
     Abortable abortable = new Abortable() {
+
+      private volatile boolean abort = false;
+
       @Override
       public void abort(String why, Throwable e) {
+        if (isAborted()) {
+          return;
+        }
+        abort = true;
+        System.err.println("Aborting because of " + why);
+        e.printStackTrace();
+        System.exit(1);
       }
 
       @Override
       public boolean isAborted() {
-        return false;
+        return abort;
       }
     };
     Configuration conf = getConf();
@@ -114,16 +225,24 @@ public class ReplicationSyncUp extends Configured implements Tool {
       Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
       Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
 
-      System.out.println("Start Replication Server start");
+      System.out.println("Start Replication Server");
+      writeInfoFile(fs);
       Replication replication = new Replication();
-      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
+      // use offline table replication queue storage
+      getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
+        OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class);
+      DummyServer server = new DummyServer(getConf(), zkw);
+      replication.initialize(server, fs, new Path(logDir, server.toString()), oldLogDir,
         new WALFactory(conf,
           ServerName
             .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
           null, false));
       ReplicationSourceManager manager = replication.getReplicationManager();
       manager.init();
-      claimReplicationQueues(zkw, manager);
+      Set<ServerName> regionServers = listRegionServers(fs, logDir);
+      addMissingReplicationQueues(manager.getQueueStorage(), regionServers,
+        manager.getReplicationPeers().getAllPeerIds());
+      claimReplicationQueues(manager, regionServers);
       while (manager.activeFailoverTaskCount() > 0) {
         Thread.sleep(SLEEP_TIME);
       }
@@ -138,23 +257,22 @@ public class ReplicationSyncUp extends Configured implements Tool {
     return 0;
   }
 
-  class DummyServer implements Server {
-    String hostname;
-    ZKWatcher zkw;
+  private static final class DummyServer implements Server {
+    private final Configuration conf;
+    private final String hostname;
+    private final ZKWatcher zkw;
+    private volatile boolean abort = false;
 
-    DummyServer(ZKWatcher zkw) {
+    DummyServer(Configuration conf, ZKWatcher zkw) {
       // a unique name in case the first run fails
       hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org";
+      this.conf = conf;
       this.zkw = zkw;
     }
 
-    DummyServer(String hostname) {
-      this.hostname = hostname;
-    }
-
     @Override
     public Configuration getConfiguration() {
-      return getConf();
+      return conf;
     }
 
     @Override
@@ -174,11 +292,18 @@ public class ReplicationSyncUp extends Configured implements Tool {
 
     @Override
     public void abort(String why, Throwable e) {
+      if (isAborted()) {
+        return;
+      }
+      abort = true;
+      System.err.println("Aborting because of " + why);
+      e.printStackTrace();
+      System.exit(1);
     }
 
     @Override
     public boolean isAborted() {
-      return false;
+      return abort;
     }
 
     @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index dce58dbfae4..129636275e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -311,6 +311,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
     return matcher.matches() ? Long.parseLong(matcher.group(2)) : NO_TIMESTAMP;
   }
 
+  public static final Comparator<Path> TIMESTAMP_COMPARATOR =
+    Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName()))
+      .thenComparing(Path::getName);
+
   /**
    * Construct the directory name for all WALs on a given server. Dir names currently look like this
    * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index d7ba6c227c6..5d474bc2164 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -127,8 +127,8 @@ public class TestLogsCleaner {
     TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
     TEST_UTIL.getAdmin().createTable(td);
     TEST_UTIL.waitTableAvailable(tableName);
-    queueStorage =
-      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName);
+    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(),
+      conf, tableName);
 
     masterServices = mock(MasterServices.class);
     when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 7a89af15902..38225613b9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -22,24 +22,28 @@ import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES
 import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
 import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
 
@@ -55,39 +59,70 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
    */
   @Test
   public void testSyncUpTool() throws Exception {
-
-    /**
-     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
-     * 'cf1' : replicated 'norep': not replicated
-     */
+    // Set up Replication: on Master and one Slave
+    // Table: t1_syncup and t2_syncup
+    // columnfamily:
+    // 'cf1' : replicated
+    // 'norep': not replicated
     setupReplication();
 
-    /**
-     * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
-     * into cf1, and 1 rows into norep verify correctly replicated to slave
-     */
+    //
+    // at Master:
+    // t1_syncup: put 100 rows into cf1, and 1 rows into norep
+    // t2_syncup: put 200 rows into cf1, and 1 rows into norep
+    //
+    // verify correctly replicated to slave
     putAndReplicateRows();
 
-    /**
-     * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
-     * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
-     * master, restart hbase on Slave step 4: verify Slave still have the rows before delete
-     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
-     * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
-     * from cf1 verify correctly replicated to Slave
-     */
+    // Verify delete works
+    //
+    // step 1: stop hbase on Slave
+    //
+    // step 2: at Master:
+    // t1_syncup: delete 50 rows from cf1
+    // t2_syncup: delete 100 rows from cf1
+    // no change on 'norep'
+    //
+    // step 3: stop hbase on master, restart hbase on Slave
+    //
+    // step 4: verify Slave still have the rows before delete
+    // t1_syncup: 100 rows from cf1
+    // t2_syncup: 200 rows from cf1
+    //
+    // step 5: run syncup tool on Master
+    //
+    // step 6: verify that delete show up on Slave
+    // t1_syncup: 50 rows from cf1
+    // t2_syncup: 100 rows from cf1
+    //
+    // verify correctly replicated to Slave
     mimicSyncUpAfterDelete();
 
-    /**
-     * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
-     * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
-     * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
-     * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
-     * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
-     * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
-     * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
-     * Slave
-     */
+    // Verify put works
+    //
+    // step 1: stop hbase on Slave
+    //
+    // step 2: at Master:
+    // t1_syncup: put 100 rows from cf1
+    // t2_syncup: put 200 rows from cf1
+    // and put another row on 'norep'
+    // ATTN:
+    // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively
+    // put to 'norep' will add a new row.
+    //
+    // step 3: stop hbase on master, restart hbase on Slave
+    //
+    // step 4: verify Slave still has the rows before put
+    // t1_syncup: 50 rows from cf1
+    // t2_syncup: 100 rows from cf1
+    //
+    // step 5: run syncup tool on Master
+    //
+    // step 6: verify that put show up on Slave and 'norep' does not
+    // t1_syncup: 100 rows from cf1
+    // t2_syncup: 200 rows from cf1
+    //
+    // verify correctly replicated to Slave
     mimicSyncUpAfterPut();
   }
 
@@ -172,7 +207,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
     int rowCount_ht2Source = countRows(ht2Source);
     assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
       rowCount_ht2Source);
-
+    List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream()
+      .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList());
     shutDownSourceHBaseCluster();
     restartTargetHBaseCluster(1);
 
@@ -184,40 +220,33 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
     assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
     assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
 
+    syncUp(UTIL1);
+
     // After sync up
-    for (int i = 0; i < NB_RETRIES; i++) {
-      syncUp(UTIL1);
-      rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
-      rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
-      if (i == NB_RETRIES - 1) {
-        if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
-          // syncUP still failed. Let's look at the source in case anything wrong there
-          restartSourceHBaseCluster(1);
-          rowCount_ht1Source = countRows(ht1Source);
-          LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
-          rowCount_ht2Source = countRows(ht2Source);
-          LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
-        }
-        assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
-          rowCountHt1TargetAtPeer1);
-        assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
-          rowCountHt2TargetAtPeer1);
-      }
-      if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
-        LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
-        break;
-      } else {
-        LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
-          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
-          + rowCountHt2TargetAtPeer1);
-      }
-      Thread.sleep(SLEEP_TIME);
+    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
+    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
+      rowCountHt1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
+      rowCountHt2TargetAtPeer1);
+
+    // check we have recorded the dead region servers and also have an info file
+    Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
+    Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
+    FileSystem fs = UTIL1.getTestFileSystem();
+    for (ServerName sn : sourceRses) {
+      assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName())));
     }
+    assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE)));
+    assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length);
+
+    restartSourceHBaseCluster(1);
+    // should finally removed all the records after restart
+    UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0);
   }
 
   private void mimicSyncUpAfterPut() throws Exception {
     LOG.debug("mimicSyncUpAfterPut");
-    restartSourceHBaseCluster(1);
     shutDownTargetHBaseCluster();
 
     Put p;
@@ -261,34 +290,14 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
     assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
       rowCountHt2TargetAtPeer1);
 
-    // after syun up
-    for (int i = 0; i < NB_RETRIES; i++) {
-      syncUp(UTIL1);
-      rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
-      rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
-      if (i == NB_RETRIES - 1) {
-        if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
-          // syncUP still failed. Let's look at the source in case anything wrong there
-          restartSourceHBaseCluster(1);
-          rowCount_ht1Source = countRows(ht1Source);
-          LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
-          rowCount_ht2Source = countRows(ht2Source);
-          LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
-        }
-        assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
-          rowCountHt1TargetAtPeer1);
-        assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
-          rowCountHt2TargetAtPeer1);
-      }
-      if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
-        LOG.info("SyncUpAfterPut succeeded at retry = " + i);
-        break;
-      } else {
-        LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
-          + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
-          + rowCountHt2TargetAtPeer1);
-      }
-      Thread.sleep(SLEEP_TIME);
-    }
+    syncUp(UTIL1);
+
+    // after sync up
+    rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
+    rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
+      rowCountHt1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
+      rowCountHt2TargetAtPeer1);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
index d3142106362..8a28db3b185 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
@@ -136,7 +136,8 @@ public abstract class TestReplicationSyncUpToolBase {
   }
 
   final void syncUp(HBaseTestingUtil util) throws Exception {
-    ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
+    ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
+      new String[0]);
   }
 
   // Utilities that manager shutdown / restart of source / sink clusters. They take care of
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index b5de8e6324f..afed0483388 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
@@ -45,14 +46,11 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-//revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
 
@@ -74,40 +72,50 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
 
   @Test
   public void testSyncUpTool() throws Exception {
-    /**
-     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
-     * 'cf1' : replicated 'norep': not replicated
-     */
+    // Set up Replication:
+    // on Master and one Slave Table: t1_syncup and t2_syncup
+    // columnfamily:
+    // 'cf1' : replicated
+    // 'norep': not replicated
     setupReplication();
 
-    /**
-     * Prepare 24 random hfile ranges required for creating hfiles
-     */
+    // Prepare 24 random hfile ranges required for creating hfiles
     Iterator<String> randomHFileRangeListIterator = null;
     Set<String> randomHFileRanges = new HashSet<>(24);
     for (int i = 0; i < 24; i++) {
-      randomHFileRanges.add(UTIL1.getRandomUUID().toString());
+      randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString());
     }
     List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
     Collections.sort(randomHFileRangeList);
     randomHFileRangeListIterator = randomHFileRangeList.iterator();
 
-    /**
-     * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
-     * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
-     * rows into norep verify correctly replicated to slave
-     */
+    // at Master:
+    // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep
+    // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into
+    // norep
+    // verify correctly replicated to slave
     loadAndReplicateHFiles(true, randomHFileRangeListIterator);
 
-    /**
-     * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
-     * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
-     * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
-     * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
-     * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
-     * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
-     * Slave
-     */
+    // Verify hfile load works
+    //
+    // step 1: stop hbase on Slave
+    //
+    // step 2: at Master:
+    // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep
+    // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep
+    //
+    // step 3: stop hbase on master, restart hbase on Slave
+    //
+    // step 4: verify Slave still has the rows before load
+    // t1_syncup: 100 rows from cf1
+    // t2_syncup: 200 rows from cf1
+    //
+    // step 5: run syncup tool on Master
+    //
+    // step 6: verify that hfiles show up on Slave and 'norep' does not
+    // t1_syncup: 200 rows from cf1
+    // t2_syncup: 400 rows from cf1
+    // verify correctly replicated to Slave
     mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
 
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
index 4148c1c1a2c..9041831d0e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.MD5Hash;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
@@ -420,4 +421,54 @@ public class TestTableReplicationQueueStorage {
     assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
     assertTrue(storage.getReplicableHFiles(peerId2).isEmpty());
   }
+
+  private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2)
+    throws ReplicationException {
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i));
+    }
+
+    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+    files1.add(new Pair<>(null, new Path("file_1")));
+    files1.add(new Pair<>(null, new Path("file_2")));
+    files1.add(new Pair<>(null, new Path("file_3")));
+    storage.addHFileRefs(peerId2, files1);
+  }
+
+  @Test
+  public void testRemoveLastSequenceIdsAndHFileRefsBefore()
+    throws ReplicationException, InterruptedException {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
+    // make sure we have write these out
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
+    }
+    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
+
+    // should have nothing after removal
+    long ts = EnvironmentEdgeManager.currentTime();
+    storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1));
+    }
+    assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
+
+    Thread.sleep(100);
+    // add again and remove with the old timestamp
+    addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
+    storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
+    // make sure we do not delete the data which are written after the give timestamp
+    for (int i = 0; i < 100; i++) {
+      String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+      assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
+    }
+    assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, storage.getReplicableHFiles(peerId2).size());
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
index 28779be4399..83cd41773ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
@@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
 
-//revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
index f495f433bc9..673b841430e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
@@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
 
-//revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
 @Category({ ReplicationTests.class, LargeTests.class })
 public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b7564ed9168..1bb9a3e2949 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -45,11 +45,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -125,8 +122,6 @@ public class TestReplicationSourceManager {
 
   private static final TableName TABLE_NAME = TableName.valueOf("test");
 
-  private static TableDescriptor TD;
-
   private static RegionInfo RI;
 
   private static NavigableMap<byte[], Integer> SCOPES;
@@ -152,10 +147,6 @@ public class TestReplicationSourceManager {
     FS = UTIL.getTestFileSystem();
     CONF = new Configuration(UTIL.getConfiguration());
     CONF.setLong("replication.sleep.before.failover", 0);
-    TD = TableDescriptorBuilder.newBuilder(TABLE_NAME)
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(F1)
-        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(F2)).build();
 
     RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
     SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -176,7 +167,8 @@ public class TestReplicationSourceManager {
     when(server.getConfiguration()).thenReturn(CONF);
     when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
     when(server.getConnection()).thenReturn(UTIL.getConnection());
-    when(server.getServerName()).thenReturn(ServerName.valueOf("hostname.example.org", 1234, 1));
+    ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
+    when(server.getServerName()).thenReturn(sn);
     oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
     FS.mkdirs(oldLogDir);
     logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
@@ -189,7 +181,7 @@ public class TestReplicationSourceManager {
     CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
 
     replication = new Replication();
-    replication.initialize(server, FS, logDir, oldLogDir,
+    replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
       new WALFactory(CONF, server.getServerName(), null, false));
     manager = replication.getReplicationManager();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 1544265435c..8731adbe4c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -99,8 +99,8 @@ public class TestSerialReplicationChecker {
     TableName repTable = TableName.valueOf("test_serial_rep");
     UTIL.getAdmin()
       .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable));
-    QUEUE_STORAGE =
-      ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), repTable);
+    QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(),
+      UTIL.getConfiguration(), repTable);
   }
 
   @AfterClass