You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/06 15:56:15 UTC

[hbase] 07/10: HBASE-27218 Support rolling upgrading (#4808)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6321c964eefba473087e9551ad93dcb80a4e247e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Nov 6 16:57:11 2022 +0800

    HBASE-27218 Support rolling upgrading (#4808)
    
    Signed-off-by: Yu Li <li...@apache.org>
---
 .../apache/hadoop/hbase/zookeeper/ZNodePaths.java  |   8 +-
 .../apache/hadoop/hbase/procedure2/Procedure.java  |  15 +
 .../protobuf/server/master/MasterProcedure.proto   |  12 +
 hbase-replication/pom.xml                          |  10 +
 .../hbase/replication/ReplicationQueueStorage.java |  19 ++
 .../replication/TableReplicationQueueStorage.java  |  65 +++-
 .../ZKReplicationQueueStorageForMigration.java     | 351 +++++++++++++++++++++
 .../replication/TestZKReplicationQueueStorage.java | 317 +++++++++++++++++++
 hbase-server/pom.xml                               |   6 +
 .../org/apache/hadoop/hbase/master/HMaster.java    |  13 +
 .../master/procedure/ServerCrashProcedure.java     |  19 ++
 .../replication/AbstractPeerNoLockProcedure.java   |   5 +-
 ...rateReplicationQueueFromZkToTableProcedure.java | 244 ++++++++++++++
 .../master/replication/ModifyPeerProcedure.java    |  26 ++
 .../master/replication/ReplicationPeerManager.java | 104 +++++-
 .../TransitPeerSyncReplicationStateProcedure.java  |  14 +
 .../replication/TestMigrateReplicationQueue.java   | 126 ++++++++
 ...rateReplicationQueueFromZkToTableProcedure.java | 226 +++++++++++++
 ...icationQueueFromZkToTableProcedureRecovery.java | 128 ++++++++
 ...tReplicationPeerManagerMigrateQueuesFromZk.java | 216 +++++++++++++
 .../hbase/replication/TestReplicationBase.java     |   2 +-
 pom.xml                                            |   7 +-
 22 files changed, 1917 insertions(+), 16 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
index d19d2100466..3f66c7cdc0c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java
@@ -220,7 +220,11 @@ public class ZNodePaths {
    * @param suffix ending of znode name
    * @return result of properly joining prefix with suffix
    */
-  public static String joinZNode(String prefix, String suffix) {
-    return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix;
+  public static String joinZNode(String prefix, String... suffix) {
+    StringBuilder sb = new StringBuilder(prefix);
+    for (String s : suffix) {
+      sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s);
+    }
+    return sb.toString();
   }
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 34c74d92c16..43adba2bc21 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.metrics.Counter;
 import org.apache.hadoop.hbase.metrics.Histogram;
@@ -33,6 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 
 /**
@@ -1011,6 +1013,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
     releaseLock(env);
   }
 
+  protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter)
+    throws ProcedureSuspendedException {
+    if (jitter) {
+      // 10% possible jitter
+      double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1);
+      timeoutMillis += add;
+    }
+    setTimeout(timeoutMillis);
+    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+    skipPersistence();
+    throw new ProcedureSuspendedException();
+  }
+
   @Override
   public int compareTo(final Procedure<TEnvironment> other) {
     return Long.compare(getProcId(), other.getProcId());
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 76a1d676487..b6f5d7e50bb 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -722,3 +722,15 @@ enum AssignReplicationQueuesState {
 message AssignReplicationQueuesStateData {
   required ServerName crashed_server = 1;
 }
+
+enum MigrateReplicationQueueFromZkToTableState {
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
+  MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
+}
+
+message MigrateReplicationQueueFromZkToTableStateData {
+  repeated string disabled_peer_id = 1;
+}
diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml
index dad93578609..d294cfdbe01 100644
--- a/hbase-replication/pom.xml
+++ b/hbase-replication/pom.xml
@@ -98,6 +98,16 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 6f6aee38cc8..1e36bbeb78f 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -184,4 +185,22 @@ public interface ReplicationQueueStorage {
    * @return Whether the replication queue table exists
    */
   boolean hasData() throws ReplicationException;
+
+  // the below 3 methods are used for migrating
+  /**
+   * Update the replication queue datas for a given region server.
+   */
+  void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+    throws ReplicationException;
+
+  /**
+   * Update last pushed sequence id for the given regions and peers.
+   */
+  void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
+    throws ReplicationException;
+
+  /**
+   * Add the given hfile refs to the given peer.
+   */
+  void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
 }
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
index 392a3692d66..f3870f4d09d 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
@@ -21,12 +21,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
 
   private final TableName tableName;
 
-  @FunctionalInterface
-  private interface TableCreator {
-
-    void create() throws IOException;
-  }
-
   public TableReplicationQueueStorage(Connection conn, TableName tableName) {
     this.conn = conn;
     this.tableName = tableName;
@@ -541,4 +538,60 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
       throw new ReplicationException("failed to get replication queue table", e);
     }
   }
+
+  @Override
+  public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+    throws ReplicationException {
+    List<Put> puts = new ArrayList<>();
+    for (ReplicationQueueData data : datas) {
+      if (data.getOffsets().isEmpty()) {
+        continue;
+      }
+      Put put = new Put(Bytes.toBytes(data.getId().toString()));
+      data.getOffsets().forEach((walGroup, offset) -> {
+        put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
+      });
+      puts.add(put);
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table.put(puts);
+    } catch (IOException e) {
+      throw new ReplicationException("failed to batch update queues", e);
+    }
+  }
+
+  @Override
+  public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
+    throws ReplicationException {
+    Map<String, Put> peerId2Put = new HashMap<>();
+    for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
+      peerId2Put
+        .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
+        .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
+          Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table
+        .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
+    } catch (IOException e) {
+      throw new ReplicationException("failed to batch update last pushed sequence ids", e);
+    }
+  }
+
+  @Override
+  public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
+    throws ReplicationException {
+    if (hfileRefs.isEmpty()) {
+      return;
+    }
+    Put put = new Put(Bytes.toBytes(peerId));
+    for (String ref : hfileRefs) {
+      put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table.put(put);
+    } catch (IOException e) {
+      throw new ReplicationException("failed to batch update hfile references", e);
+    }
+  }
 }
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
new file mode 100644
index 00000000000..22cc1314522
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+
+/**
+ * Just retain a small set of the methods for the old zookeeper based replication queue storage, for
+ * migrating.
+ */
+@InterfaceAudience.Private
+public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase {
+
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+    "zookeeper.znode.replication.hfile.refs";
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
+
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
+    "zookeeper.znode.replication.regions";
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
+
+  /**
+   * The name of the znode that contains all replication queues
+   */
+  private final String queuesZNode;
+
+  /**
+   * The name of the znode that contains queues of hfile references to be replicated
+   */
+  private final String hfileRefsZNode;
+
+  private final String regionsZNode;
+
+  public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) {
+    super(zookeeper, conf);
+    String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
+    this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
+    this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
+    this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
+      .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
+  }
+
+  public interface MigrationIterator<T> {
+
+    T next() throws Exception;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static final MigrationIterator EMPTY_ITER = new MigrationIterator() {
+
+    @Override
+    public Object next() {
+      return null;
+    }
+  };
+
+  public static final class ZkReplicationQueueData {
+
+    private final ReplicationQueueId queueId;
+
+    private final Map<String, Long> walOffsets;
+
+    public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) {
+      this.queueId = queueId;
+      this.walOffsets = walOffsets;
+    }
+
+    public ReplicationQueueId getQueueId() {
+      return queueId;
+    }
+
+    public Map<String, Long> getWalOffsets() {
+      return walOffsets;
+    }
+  }
+
+  private String getRsNode(ServerName serverName) {
+    return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
+  }
+
+  private String getQueueNode(ServerName serverName, String queueId) {
+    return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
+  }
+
+  private String getFileNode(String queueNode, String fileName) {
+    return ZNodePaths.joinZNode(queueNode, fileName);
+  }
+
+  private String getFileNode(ServerName serverName, String queueId, String fileName) {
+    return getFileNode(getQueueNode(serverName, queueId), fileName);
+  }
+
+  @SuppressWarnings("unchecked")
+  public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues()
+    throws KeeperException {
+    List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
+    if (replicators == null || replicators.isEmpty()) {
+      ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+      return EMPTY_ITER;
+    }
+    Iterator<String> iter = replicators.iterator();
+    return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() {
+
+      private ServerName previousServerName;
+
+      @Override
+      public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception {
+        if (previousServerName != null) {
+          ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName));
+        }
+        if (!iter.hasNext()) {
+          ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+          return null;
+        }
+        String replicator = iter.next();
+        ServerName serverName = ServerName.parseServerName(replicator);
+        previousServerName = serverName;
+        List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
+        if (queueIdList == null || queueIdList.isEmpty()) {
+          return Pair.newPair(serverName, Collections.emptyList());
+        }
+        List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size());
+        for (String queueIdStr : queueIdList) {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr);
+          ReplicationQueueId queueId;
+          if (queueInfo.getDeadRegionServers().isEmpty()) {
+            queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId());
+          } else {
+            queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(),
+              queueInfo.getDeadRegionServers().get(0));
+          }
+          List<String> wals =
+            ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr));
+          ZkReplicationQueueData queueData;
+          if (wals == null || wals.isEmpty()) {
+            queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap());
+          } else {
+            Map<String, Long> walOffsets = new HashMap<>();
+            for (String wal : wals) {
+              byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal));
+              if (data == null || data.length == 0) {
+                walOffsets.put(wal, 0L);
+              } else {
+                walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data));
+              }
+            }
+            queueData = new ZkReplicationQueueData(queueId, walOffsets);
+          }
+          queueDataList.add(queueData);
+        }
+        return Pair.newPair(serverName, queueDataList);
+      }
+    };
+  }
+
+  public static final class ZkLastPushedSeqId {
+
+    private final String encodedRegionName;
+
+    private final String peerId;
+
+    private final long lastPushedSeqId;
+
+    ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) {
+      this.encodedRegionName = encodedRegionName;
+      this.peerId = peerId;
+      this.lastPushedSeqId = lastPushedSeqId;
+    }
+
+    public String getEncodedRegionName() {
+      return encodedRegionName;
+    }
+
+    public String getPeerId() {
+      return peerId;
+    }
+
+    public long getLastPushedSeqId() {
+      return lastPushedSeqId;
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds()
+    throws KeeperException {
+    List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
+    if (level1Prefixs == null || level1Prefixs.isEmpty()) {
+      ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
+      return EMPTY_ITER;
+    }
+    Iterator<String> level1Iter = level1Prefixs.iterator();
+    return new MigrationIterator<List<ZkLastPushedSeqId>>() {
+
+      private String level1Prefix;
+
+      private Iterator<String> level2Iter;
+
+      private String level2Prefix;
+
+      @Override
+      public List<ZkLastPushedSeqId> next() throws Exception {
+        for (;;) {
+          if (level2Iter == null || !level2Iter.hasNext()) {
+            if (!level1Iter.hasNext()) {
+              ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
+              return null;
+            }
+            if (level1Prefix != null) {
+              // this will also delete the previous level2Prefix which is under this level1Prefix
+              ZKUtil.deleteNodeRecursively(zookeeper,
+                ZNodePaths.joinZNode(regionsZNode, level1Prefix));
+            }
+            level1Prefix = level1Iter.next();
+            List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper,
+              ZNodePaths.joinZNode(regionsZNode, level1Prefix));
+            if (level2Prefixes != null) {
+              level2Iter = level2Prefixes.iterator();
+              // reset level2Prefix as we have switched level1Prefix, otherwise the below delete
+              // level2Prefix section will delete the znode with this level2Prefix under the new
+              // level1Prefix
+              level2Prefix = null;
+            }
+          } else {
+            if (level2Prefix != null) {
+              ZKUtil.deleteNodeRecursively(zookeeper,
+                ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
+            }
+            level2Prefix = level2Iter.next();
+            List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper,
+              ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix));
+            if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) {
+              return Collections.emptyList();
+            }
+            List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>();
+            for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) {
+              byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode,
+                level1Prefix, level2Prefix, encodedRegionNameAndPeerId));
+              long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data);
+              Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator();
+              String encodedRegionName = level1Prefix + level2Prefix + iter.next();
+              String peerId = iter.next();
+              lastPushedSeqIds
+                .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId));
+            }
+            return Collections.unmodifiableList(lastPushedSeqIds);
+          }
+        }
+      }
+    };
+  }
+
+  private String getHFileRefsPeerNode(String peerId) {
+    return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
+  }
+
+  /**
+   * Pair&lt;PeerId, List&lt;HFileRefs&gt;&gt;
+   */
+  @SuppressWarnings("unchecked")
+  public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException {
+    List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
+    if (peerIds == null || peerIds.isEmpty()) {
+      ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
+      return EMPTY_ITER;
+    }
+    Iterator<String> iter = peerIds.iterator();
+    return new MigrationIterator<Pair<String, List<String>>>() {
+
+      private String previousPeerId;
+
+      @Override
+      public Pair<String, List<String>> next() throws KeeperException {
+        if (previousPeerId != null) {
+          ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId));
+        }
+        if (!iter.hasNext()) {
+          ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
+          return null;
+        }
+        String peerId = iter.next();
+        List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId));
+        previousPeerId = peerId;
+        return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList());
+      }
+    };
+  }
+
+  public boolean hasData() throws KeeperException {
+    return ZKUtil.checkExists(zookeeper, queuesZNode) != -1
+      || ZKUtil.checkExists(zookeeper, regionsZNode) != -1
+      || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1;
+  }
+
+  public void deleteAllData() throws KeeperException {
+    ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode);
+    ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode);
+    ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode);
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  String getQueuesZNode() {
+    return queuesZNode;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  String getHfileRefsZNode() {
+    return hfileRefsZNode;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  String getRegionsZNode() {
+    return regionsZNode;
+  }
+}
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
new file mode 100644
index 00000000000..e38b7b134e9
--- /dev/null
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MD5Hash;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationQueueStorage {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
+
+  private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
+
+  private ZKWatcher zk;
+
+  private ZKReplicationQueueStorageForMigration storage;
+
+  @Rule
+  public final TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    UTIL.shutdownMiniZKCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName());
+    zk = new ZKWatcher(conf, name.getMethodName(), null);
+    storage = new ZKReplicationQueueStorageForMigration(zk, conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode);
+    Closeables.close(zk, true);
+  }
+
+  public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers,
+    String peerId, ServerName deadServer) throws KeeperException {
+    ZKWatcher zk = storage.zookeeper;
+    for (int i = 0; i < nServers; i++) {
+      ServerName sn =
+        ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime());
+      String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString());
+      String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId);
+      ZKUtil.createWithParents(zk, peerZNode);
+      for (int j = 0; j < i; j++) {
+        String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j);
+        ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
+      }
+      String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer);
+      ZKUtil.createWithParents(zk, deadServerPeerZNode);
+      for (int j = 0; j < i; j++) {
+        String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j);
+        if (j > 0) {
+          ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j));
+        } else {
+          ZKUtil.createWithParents(zk, wal);
+        }
+      }
+    }
+    ZKUtil.createWithParents(zk,
+      ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString()));
+  }
+
+  private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName,
+    String peerId) {
+    return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2),
+      encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId);
+  }
+
+  public static Map<String, Set<String>> mockLastPushedSeqIds(
+    ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions,
+    int emptyLevel1Count, int emptyLevel2Count) throws KeeperException {
+    ZKWatcher zk = storage.zookeeper;
+    Map<String, Set<String>> name2PeerIds = new HashMap<>();
+    byte[] bytes = new byte[32];
+    for (int i = 0; i < nRegions; i++) {
+      ThreadLocalRandom.current().nextBytes(bytes);
+      String encodeName = MD5Hash.getMD5AsHex(bytes);
+      String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1);
+      ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1));
+      String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2);
+      ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2));
+      name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2));
+    }
+    int addedEmptyZNodes = 0;
+    for (int i = 0; i < 256; i++) {
+      String level1ZNode =
+        ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i));
+      if (ZKUtil.checkExists(zk, level1ZNode) == -1) {
+        ZKUtil.createWithParents(zk, level1ZNode);
+        addedEmptyZNodes++;
+        if (addedEmptyZNodes <= emptyLevel2Count) {
+          ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab"));
+        }
+        if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) {
+          break;
+        }
+      }
+    }
+    return name2PeerIds;
+  }
+
+  public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers)
+    throws KeeperException {
+    ZKWatcher zk = storage.zookeeper;
+    for (int i = 0; i < nPeers; i++) {
+      String peerId = "peer_" + i;
+      ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId));
+      for (int j = 0; j < i; j++) {
+        ZKUtil.createWithParents(zk,
+          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j));
+      }
+    }
+  }
+
+  @Test
+  public void testDeleteAllData() throws Exception {
+    assertFalse(storage.hasData());
+    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
+    assertTrue(storage.hasData());
+    storage.deleteAllData();
+    assertFalse(storage.hasData());
+  }
+
+  @Test
+  public void testEmptyIter() throws Exception {
+    ZKUtil.createWithParents(zk, storage.getQueuesZNode());
+    ZKUtil.createWithParents(zk, storage.getRegionsZNode());
+    ZKUtil.createWithParents(zk, storage.getHfileRefsZNode());
+    assertNull(storage.listAllQueues().next());
+    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
+    assertNull(storage.listAllLastPushedSeqIds().next());
+    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
+    assertNull(storage.listAllHFileRefs().next());
+    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
+  }
+
+  @Test
+  public void testListAllQueues() throws Exception {
+    String peerId = "1";
+    ServerName deadServer =
+      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
+    int nServers = 10;
+    mockQueuesData(storage, nServers, peerId, deadServer);
+    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
+      storage.listAllQueues();
+    ServerName previousServerName = null;
+    for (int i = 0; i < nServers + 1; i++) {
+      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
+      assertNotNull(pair);
+      if (previousServerName != null) {
+        assertEquals(-1, ZKUtil.checkExists(zk,
+          ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString())));
+      }
+      ServerName sn = pair.getFirst();
+      previousServerName = sn;
+      if (sn.equals(deadServer)) {
+        assertThat(pair.getSecond(), empty());
+      } else {
+        assertEquals(2, pair.getSecond().size());
+        int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname())));
+        ZkReplicationQueueData data0 = pair.getSecond().get(0);
+        assertEquals(peerId, data0.getQueueId().getPeerId());
+        assertEquals(sn, data0.getQueueId().getServerName());
+        assertEquals(n, data0.getWalOffsets().size());
+        for (int j = 0; j < n; j++) {
+          assertEquals(j,
+            data0.getWalOffsets().get(
+              (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
+              .intValue());
+        }
+        ZkReplicationQueueData data1 = pair.getSecond().get(1);
+        assertEquals(peerId, data1.getQueueId().getPeerId());
+        assertEquals(sn, data1.getQueueId().getServerName());
+        assertEquals(n, data1.getWalOffsets().size());
+        for (int j = 0; j < n; j++) {
+          assertEquals(j,
+            data1.getWalOffsets().get(
+              (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j)
+              .intValue());
+        }
+        // the order of the returned result is undetermined
+        if (data0.getQueueId().getSourceServerName().isPresent()) {
+          assertEquals(deadServer, data0.getQueueId().getSourceServerName().get());
+          assertFalse(data1.getQueueId().getSourceServerName().isPresent());
+        } else {
+          assertEquals(deadServer, data1.getQueueId().getSourceServerName().get());
+        }
+      }
+    }
+    assertNull(iter.next());
+    assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode()));
+  }
+
+  @Test
+  public void testListAllLastPushedSeqIds() throws Exception {
+    String peerId1 = "1";
+    String peerId2 = "2";
+    Map<String, Set<String>> name2PeerIds =
+      mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10);
+    MigrationIterator<List<ZkLastPushedSeqId>> iter = storage.listAllLastPushedSeqIds();
+    int emptyListCount = 0;
+    for (;;) {
+      List<ZkLastPushedSeqId> list = iter.next();
+      if (list == null) {
+        break;
+      }
+      if (list.isEmpty()) {
+        emptyListCount++;
+        continue;
+      }
+      for (ZkLastPushedSeqId seqId : list) {
+        name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId());
+        if (seqId.getPeerId().equals(peerId1)) {
+          assertEquals(1, seqId.getLastPushedSeqId());
+        } else {
+          assertEquals(2, seqId.getLastPushedSeqId());
+        }
+      }
+    }
+    assertEquals(10, emptyListCount);
+    name2PeerIds.forEach((encodedRegionName, peerIds) -> {
+      assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
+    });
+    assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode()));
+  }
+
+  @Test
+  public void testListAllHFileRefs() throws Exception {
+    int nPeers = 10;
+    mockHFileRefs(storage, nPeers);
+    MigrationIterator<Pair<String, List<String>>> iter = storage.listAllHFileRefs();
+    String previousPeerId = null;
+    for (int i = 0; i < nPeers; i++) {
+      Pair<String, List<String>> pair = iter.next();
+      if (previousPeerId != null) {
+        assertEquals(-1, ZKUtil.checkExists(zk,
+          ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId)));
+      }
+      String peerId = pair.getFirst();
+      previousPeerId = peerId;
+      int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId)));
+      assertEquals(index, pair.getSecond().size());
+    }
+    assertNull(iter.next());
+    assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode()));
+  }
+}
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 0dba4aa9833..b61b0252a05 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -102,6 +102,12 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-replication</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-replication</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-balancer</artifactId>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 118457648de..67d0f889d64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
 import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
+import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
 import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
@@ -221,6 +222,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
@@ -1050,6 +1052,17 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
     this.balancer.initialize();
     this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor());
 
+    // try migrate replication data
+    ZKReplicationQueueStorageForMigration oldReplicationQueueStorage =
+      new ZKReplicationQueueStorageForMigration(zooKeeper, conf);
+    // check whether there are something to migrate and we haven't scheduled a migration procedure
+    // yet
+    if (
+      oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream()
+        .allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure))
+    ) {
+      procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
+    }
     // start up all service threads.
     startupTaskGroup.addTask("Initializing master service threads");
     startServiceThreads();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 487c45e5c5c..97976756d82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
 import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
 import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure;
+import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 /**
  * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
@@ -266,6 +268,16 @@ public class ServerCrashProcedure extends
           }
           break;
         case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
+          if (
+            env.getMasterServices().getProcedures().stream()
+              .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
+              .anyMatch(p -> !p.isFinished())
+          ) {
+            LOG.info("There is a pending {}, will retry claim replication queue later",
+              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName());
+            suspend(10_000, true);
+            return Flow.NO_MORE_STATE;
+          }
           addChildProcedure(new AssignReplicationQueuesProcedure(serverName));
           setNextState(ServerCrashState.SERVER_CRASH_FINISH);
           break;
@@ -431,6 +443,13 @@ public class ServerCrashProcedure extends
     env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName());
   }
 
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false;
+  }
+
   @Override
   public void toStringClassDetails(StringBuilder sb) {
     sb.append(getProcName());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
index 660f9968573..1f0a89f2076 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
@@ -98,10 +98,7 @@ public abstract class AbstractPeerNoLockProcedure<TState>
     }
     long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
     backoffConsumer.accept(backoff);
-    setTimeout(Math.toIntExact(backoff));
-    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
-    skipPersistence();
-    throw new ProcedureSuspendedException();
+    throw suspend(Math.toIntExact(backoff), false);
   }
 
   protected final void resetRetry() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
new file mode 100644
index 00000000000..536f232338e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * A procedure for migrating replication queue data from zookeeper to hbase:replication table.
+ */
+@InterfaceAudience.Private
+public class MigrateReplicationQueueFromZkToTableProcedure
+  extends StateMachineProcedure<MasterProcedureEnv, MigrateReplicationQueueFromZkToTableState>
+  implements GlobalProcedureInterface {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class);
+
+  private static final int MIN_MAJOR_VERSION = 3;
+
+  private List<String> disabledPeerIds;
+
+  private List<Future<?>> futures;
+
+  private ExecutorService executor;
+
+  @Override
+  public String getGlobalId() {
+    return getClass().getSimpleName();
+  }
+
+  private ExecutorService getExecutorService() {
+    if (executor == null) {
+      executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
+        .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
+    }
+    return executor;
+  }
+
+  private void shutdownExecutorService() {
+    if (executor != null) {
+      executor.shutdown();
+      executor = null;
+    }
+  }
+
+  private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
+    long peerProcCount;
+    try {
+      peerProcCount = env.getMasterServices().getProcedures().stream()
+        .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
+    } catch (IOException e) {
+      LOG.warn("failed to check peer procedure status", e);
+      throw suspend(5000, true);
+    }
+    if (peerProcCount > 0) {
+      LOG.info("There are still {} pending peer procedures, will sleep and check later",
+        peerProcCount);
+      throw suspend(10_000, true);
+    }
+    LOG.info("No pending peer procedures found, continue...");
+  }
+
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env,
+    MigrateReplicationQueueFromZkToTableState state)
+    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+    switch (state) {
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
+        waitUntilNoPeerProcedure(env);
+        List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
+        if (peers.isEmpty()) {
+          LOG.info("No active replication peer found, delete old replication queue data and quit");
+          ZKReplicationQueueStorageForMigration oldStorage =
+            new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(),
+              env.getMasterConfiguration());
+          try {
+            oldStorage.deleteAllData();
+          } catch (KeeperException e) {
+            LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
+            suspend(10_000, true);
+          }
+          return Flow.NO_MORE_STATE;
+        }
+        // here we do not care the peers which have already been disabled, as later we do not need
+        // to enable them
+        disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
+          .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
+        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
+        return Flow.HAS_MORE_STATE;
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
+        for (String peerId : disabledPeerIds) {
+          addChildProcedure(new DisablePeerProcedure(peerId));
+        }
+        setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
+        return Flow.HAS_MORE_STATE;
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
+        if (futures != null) {
+          // wait until all futures done
+          long notDone = futures.stream().filter(f -> !f.isDone()).count();
+          if (notDone == 0) {
+            boolean succ = true;
+            for (Future<?> future : futures) {
+              try {
+                future.get();
+              } catch (Exception e) {
+                succ = false;
+                LOG.warn("Failed to migrate", e);
+              }
+            }
+            if (succ) {
+              shutdownExecutorService();
+              setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
+              return Flow.HAS_MORE_STATE;
+            }
+            // reschedule to retry migration again
+            futures = null;
+          } else {
+            LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
+            throw suspend(10_000, true);
+          }
+        }
+        try {
+          futures = env.getReplicationPeerManager()
+            .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
+        } catch (IOException e) {
+          LOG.warn("failed to submit migration tasks", e);
+          throw suspend(10_000, true);
+        }
+        throw suspend(10_000, true);
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
+        long rsWithLowerVersion =
+          env.getMasterServices().getServerManager().getOnlineServers().values().stream()
+            .filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count();
+        if (rsWithLowerVersion == 0) {
+          setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
+          return Flow.HAS_MORE_STATE;
+        } else {
+          LOG.info("There are still {} region servers which have a major version less than {}, "
+            + "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
+          throw suspend(10_000, true);
+        }
+      case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
+        for (String peerId : disabledPeerIds) {
+          addChildProcedure(new EnablePeerProcedure(peerId));
+        }
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException("unhandled state=" + state);
+    }
+  }
+
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false;
+  }
+
+  @Override
+  protected void rollbackState(MasterProcedureEnv env,
+    MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected MigrateReplicationQueueFromZkToTableState getState(int stateId) {
+    return MigrateReplicationQueueFromZkToTableState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected MigrateReplicationQueueFromZkToTableState getInitialState() {
+    return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    super.serializeStateData(serializer);
+    MigrateReplicationQueueFromZkToTableStateData.Builder builder =
+      MigrateReplicationQueueFromZkToTableStateData.newBuilder();
+    if (disabledPeerIds != null) {
+      builder.addAllDisabledPeerId(disabledPeerIds);
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    super.deserializeStateData(serializer);
+    MigrateReplicationQueueFromZkToTableStateData data =
+      serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class);
+    disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList());
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 78b97620c01..c358ec164e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.master.TableStateManager;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -152,12 +154,36 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
     }
   }
 
+  private boolean shouldFailForMigrating(MasterProcedureEnv env) throws IOException {
+    long parentProcId = getParentProcId();
+    if (
+      parentProcId != Procedure.NO_PROC_ID && env.getMasterServices().getMasterProcedureExecutor()
+        .getProcedure(parentProcId) instanceof MigrateReplicationQueueFromZkToTableProcedure
+    ) {
+      // this is scheduled by MigrateReplicationQueueFromZkToTableProcedure, should not fail it
+      return false;
+    }
+    return env.getMasterServices().getProcedures().stream()
+      .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
+      .anyMatch(p -> !p.isFinished());
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
     throws ProcedureSuspendedException, InterruptedException {
     switch (state) {
       case PRE_PEER_MODIFICATION:
         try {
+          if (shouldFailForMigrating(env)) {
+            LOG.info("There is a pending {}, give up execution of {}",
+              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
+              getClass().getName());
+            setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
+              new DoNotRetryIOException("There is a pending "
+                + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
+            releaseLatch(env);
+            return Flow.NO_MORE_STATE;
+          }
           prePeerModification(env);
         } catch (IOException e) {
           LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 0a1dbf848bd..81f569c3f9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -21,14 +21,18 @@ import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -39,6 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterServices;
@@ -49,17 +54,24 @@ import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
 import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -106,7 +118,7 @@ public class ReplicationPeerManager {
   private final Configuration conf;
 
   @FunctionalInterface
-  private interface ReplicationQueueStorageInitializer {
+  interface ReplicationQueueStorageInitializer {
 
     void initialize() throws IOException;
   }
@@ -138,6 +150,10 @@ public class ReplicationPeerManager {
     }
   }
 
+  private void initializeQueueStorage() throws IOException {
+    queueStorageInitializer.initialize();
+  }
+
   void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
     throws ReplicationException, IOException {
     if (peerId.contains("-")) {
@@ -152,7 +168,7 @@ public class ReplicationPeerManager {
     }
 
     // lazy create table
-    queueStorageInitializer.initialize();
+    initializeQueueStorage();
     // make sure that there is no queues with the same peer id. This may happen when we create a
     // peer with the same id with a old deleted peer. If the replication queues for the old peer
     // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
@@ -699,4 +715,88 @@ public class ReplicationPeerManager {
   public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
     return replicationLogCleanerBarrier;
   }
+
+  private ReplicationQueueData convert(ZkReplicationQueueData zkData) {
+    Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>();
+    zkData.getWalOffsets().forEach((wal, offset) -> {
+      String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+      groupOffsets.compute(walGroup, (k, oldOffset) -> {
+        if (oldOffset == null) {
+          return new ReplicationGroupOffset(wal, offset);
+        }
+        // we should record the first wal's offset
+        long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal());
+        long walTs = AbstractFSWALProvider.getTimestamp(wal);
+        if (walTs < oldWalTs) {
+          return new ReplicationGroupOffset(wal, offset);
+        }
+        return oldOffset;
+      });
+    });
+    return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets));
+  }
+
+  private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage)
+    throws Exception {
+    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
+      oldQueueStorage.listAllQueues();
+    for (;;) {
+      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
+      if (pair == null) {
+        return;
+      }
+      queueStorage.batchUpdateQueues(pair.getFirst(),
+        pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId()))
+          .map(this::convert).collect(Collectors.toList()));
+    }
+  }
+
+  private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage)
+    throws Exception {
+    MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds();
+    for (;;) {
+      List<ZkLastPushedSeqId> list = iter.next();
+      if (list == null) {
+        return;
+      }
+      queueStorage.batchUpdateLastSequenceIds(list.stream()
+        .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList()));
+    }
+  }
+
+  private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage)
+    throws Exception {
+    MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs();
+    for (;;) {
+      Pair<String, List<String>> pair = iter.next();
+      if (pair == null) {
+        return;
+      }
+      if (peers.containsKey(pair.getFirst())) {
+        queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond());
+      }
+    }
+  }
+
+  /**
+   * Submit the migration tasks to the given {@code executor} and return the futures.
+   */
+  List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
+    throws IOException {
+    // the replication queue table creation is asynchronous and will be triggered by addPeer, so
+    // here we need to manually initialize it since we will not call addPeer.
+    initializeQueueStorage();
+    ZKReplicationQueueStorageForMigration oldStorage =
+      new ZKReplicationQueueStorageForMigration(zookeeper, conf);
+    return Arrays.asList(executor.submit(() -> {
+      migrateQueues(oldStorage);
+      return null;
+    }), executor.submit(() -> {
+      migrateLastPushedSeqIds(oldStorage);
+      return null;
+    }), executor.submit(() -> {
+      migrateHFileRefs(oldStorage);
+      return null;
+    }));
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 2de10cb2778..89658903538 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -236,6 +237,19 @@ public class TransitPeerSyncReplicationStateProcedure
     switch (state) {
       case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
         try {
+          if (
+            env.getMasterServices().getProcedures().stream()
+              .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
+              .anyMatch(p -> !p.isFinished())
+          ) {
+            LOG.info("There is a pending {}, give up execution of {}",
+              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
+              getClass().getSimpleName());
+            setFailure("master-transit-peer-sync-replication-state",
+              new DoNotRetryIOException("There is a pending "
+                + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
+            return Flow.NO_MORE_STATE;
+          }
           preTransit(env);
         } catch (IOException e) {
           LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
new file mode 100644
index 00000000000..1b0f727a072
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestMigrateReplicationQueue extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class);
+
+  private int disableAndInsert() throws Exception {
+    UTIL1.getAdmin().disableReplicationPeer(PEER_ID2);
+    return UTIL1.loadTable(htable1, famName);
+  }
+
+  private String getQueuesZNode() throws IOException {
+    Configuration conf = UTIL1.getConfiguration();
+    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
+    String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
+      conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
+        ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
+    return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs"));
+  }
+
+  private void mockData() throws Exception {
+    // delete the replication queue table to simulate upgrading from an older version of hbase
+    TableName replicationQueueTableName = TableName
+      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
+        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
+    List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
+      .getReplicationPeerManager().getQueueStorage().listAllQueues();
+    assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size());
+    UTIL1.getAdmin().disableTable(replicationQueueTableName);
+    UTIL1.getAdmin().deleteTable(replicationQueueTableName);
+    // shutdown the hbase cluster
+    UTIL1.shutdownMiniHBaseCluster();
+    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
+    String queuesZNode = getQueuesZNode();
+    for (ReplicationQueueData queueData : queueDatas) {
+      String replicatorZNode =
+        ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString());
+      String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId());
+      assertEquals(1, queueData.getOffsets().size());
+      ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values());
+      String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal());
+      ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset()));
+    }
+  }
+
+  @Test
+  public void testMigrate() throws Exception {
+    int count = disableAndInsert();
+    mockData();
+    restartSourceCluster(1);
+    UTIL1.waitFor(60000,
+      () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+        .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny()
+        .map(Procedure::isSuccess).orElse(false));
+    TableName replicationQueueTableName = TableName
+      .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
+        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
+    assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName));
+    ZKWatcher zk = UTIL1.getZooKeeperWatcher();
+    assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode()));
+    // wait until SCP finishes, which means we can finish the claim queue operation
+    UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+      .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
+    List<ReplicationQueueData> queueDatas = UTIL1.getMiniHBaseCluster().getMaster()
+      .getReplicationPeerManager().getQueueStorage().listAllQueues();
+    assertEquals(1, queueDatas.size());
+    // should have 1 recovered queue, as we haven't replicated anything out so there is no queue
+    // data for the new alive region server
+    assertTrue(queueDatas.get(0).getId().isRecovered());
+    assertEquals(1, queueDatas.get(0).getOffsets().size());
+    // the peer is still disabled, so no data has been replicated
+    assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2));
+    assertEquals(0, HBaseTestingUtil.countRows(htable2));
+    // enable peer, and make sure the replication can continue correctly
+    UTIL1.getAdmin().enableReplicationPeer(PEER_ID2);
+    waitForReplication(count, 100);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
new file mode 100644
index 00000000000..752abc380b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionServerList;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestMigrateReplicationQueueFromZkToTableProcedure {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedure.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  public static final class HMasterForTest extends HMaster {
+
+    public HMasterForTest(Configuration conf) throws IOException {
+      super(conf);
+    }
+
+    @Override
+    protected ServerManager createServerManager(MasterServices master, RegionServerList storage)
+      throws IOException {
+      setupClusterConnection();
+      return new ServerManagerForTest(master, storage);
+    }
+  }
+
+  private static final ConcurrentMap<ServerName, ServerMetrics> EXTRA_REGION_SERVERS =
+    new ConcurrentHashMap<>();
+
+  public static final class ServerManagerForTest extends ServerManager {
+
+    public ServerManagerForTest(MasterServices master, RegionServerList storage) {
+      super(master, storage);
+    }
+
+    @Override
+    public Map<ServerName, ServerMetrics> getOnlineServers() {
+      Map<ServerName, ServerMetrics> map = new HashMap<>(super.getOnlineServers());
+      map.putAll(EXTRA_REGION_SERVERS);
+      return map;
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    UTIL.startMiniCluster(
+      StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+    return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Admin admin = UTIL.getAdmin();
+    for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
+      admin.removeReplicationPeer(pd.getPeerId());
+    }
+  }
+
+  private static CountDownLatch PEER_PROC_ARRIVE;
+
+  private static CountDownLatch PEER_PROC_RESUME;
+
+  public static final class FakePeerProcedure extends Procedure<MasterProcedureEnv>
+    implements PeerProcedureInterface {
+
+    private String peerId;
+
+    public FakePeerProcedure() {
+    }
+
+    public FakePeerProcedure(String peerId) {
+      this.peerId = peerId;
+    }
+
+    @Override
+    public String getPeerId() {
+      return peerId;
+    }
+
+    @Override
+    public PeerOperationType getPeerOperationType() {
+      return PeerOperationType.UPDATE_CONFIG;
+    }
+
+    @Override
+    protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      PEER_PROC_ARRIVE.countDown();
+      PEER_PROC_RESUME.await();
+      return null;
+    }
+
+    @Override
+    protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected boolean abort(MasterProcedureEnv env) {
+      return false;
+    }
+
+    @Override
+    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    }
+
+    @Override
+    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+    }
+  }
+
+  @Test
+  public void testWaitUntilNoPeerProcedure() throws Exception {
+    PEER_PROC_ARRIVE = new CountDownLatch(1);
+    PEER_PROC_RESUME = new CountDownLatch(1);
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+    procExec.submitProcedure(new FakePeerProcedure("1"));
+    PEER_PROC_ARRIVE.await();
+    MigrateReplicationQueueFromZkToTableProcedure proc =
+      new MigrateReplicationQueueFromZkToTableProcedure();
+    procExec.submitProcedure(proc);
+    // make sure we will wait until there is no peer related procedures before proceeding
+    UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
+    // continue and make sure we can finish successfully
+    PEER_PROC_RESUME.countDown();
+    UTIL.waitFor(30000, () -> proc.isSuccess());
+  }
+
+  @Test
+  public void testDisablePeerAndWaitUpgrading() throws Exception {
+    String peerId = "2";
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
+      .setReplicateAllUserTables(true).build();
+    UTIL.getAdmin().addReplicationPeer(peerId, rpc);
+    // put a fake region server to simulate that there are still region servers with older version
+    ServerMetrics metrics = mock(ServerMetrics.class);
+    when(metrics.getVersion()).thenReturn("2.5.0");
+    EXTRA_REGION_SERVERS
+      .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
+
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+
+    MigrateReplicationQueueFromZkToTableProcedure proc =
+      new MigrateReplicationQueueFromZkToTableProcedure();
+    procExec.submitProcedure(proc);
+    // wait until we reach the wait upgrading state
+    UTIL.waitFor(30000,
+      () -> proc.getCurrentStateId()
+          == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
+        && proc.getState() == ProcedureState.WAITING_TIMEOUT);
+    // make sure the peer is disabled for migrating
+    assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
+
+    // the procedure should finish successfully
+    EXTRA_REGION_SERVERS.clear();
+    UTIL.waitFor(30000, () -> proc.isSuccess());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java
new file mode 100644
index 00000000000..8d1a975400f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestMigrateReplicationQueueFromZkToTableProcedureRecovery {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedureRecovery.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void cleanupTest() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+    return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
+  }
+
+  private String getHFileRefsZNode() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    ZKWatcher zk = UTIL.getZooKeeperWatcher();
+    String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode,
+      conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE,
+        ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT));
+    return ZNodePaths.joinZNode(replicationZNode,
+      conf.get(ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+        ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT));
+  }
+
+  @Test
+  public void testRecoveryAndDoubleExecution() throws Exception {
+    String peerId = "2";
+    ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
+      .setReplicateAllUserTables(true).build();
+    UTIL.getAdmin().addReplicationPeer(peerId, rpc);
+
+    // here we only test a simple migration, more complicated migration will be tested in other UTs,
+    // such as TestMigrateReplicationQueue and TestReplicationPeerManagerMigrateFromZk
+    String hfileRefsZNode = getHFileRefsZNode();
+    String hfile = "hfile";
+    String hfileZNode = ZNodePaths.joinZNode(hfileRefsZNode, peerId, hfile);
+    ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), hfileZNode);
+
+    ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+
+    ProcedureTestingUtility.waitNoProcedureRunning(procExec);
+    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+    // Start the migration procedure && kill the executor
+    long procId = procExec.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure());
+    // Restart the executor and execute the step twice
+    MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId);
+    // Validate the migration result
+    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+    ReplicationQueueStorage queueStorage =
+      UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+    List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
+    assertThat(hfiles, Matchers.<List<String>> both(hasItem(hfile)).and(hasSize(1)));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
new file mode 100644
index 00000000000..73915e856ea
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationPeerManagerMigrateQueuesFromZk {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationPeerManagerMigrateQueuesFromZk.class);
+
+  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static ExecutorService EXECUTOR;
+
+  ConcurrentMap<String, ReplicationPeerDescription> peers;
+
+  private ReplicationPeerStorage peerStorage;
+
+  private ReplicationQueueStorage queueStorage;
+
+  private ReplicationQueueStorageInitializer queueStorageInitializer;
+
+  private ReplicationPeerManager manager;
+
+  private int nServers = 10;
+
+  private int nPeers = 10;
+
+  private int nRegions = 100;
+
+  private ServerName deadServerName;
+
+  @Rule
+  public final TableNameTestRule tableNameRule = new TableNameTestRule();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster(1);
+    EXECUTOR = Executors.newFixedThreadPool(3,
+      new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d")
+        .build());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    EXECUTOR.shutdownNow();
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    peerStorage = mock(ReplicationPeerStorage.class);
+    TableName tableName = tableNameRule.getTableName();
+    UTIL.getAdmin()
+      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
+    queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName);
+    queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class);
+    peers = new ConcurrentHashMap<>();
+    deadServerName =
+      ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime());
+    manager = new ReplicationPeerManager(peerStorage, queueStorage, peers, conf, "cluster",
+      queueStorageInitializer);
+  }
+
+  private Map<String, Set<String>> prepareData() throws Exception {
+    ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration(
+      UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
+    TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName);
+    Map<String, Set<String>> encodedName2PeerIds = TestZKReplicationQueueStorage
+      .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10);
+    TestZKReplicationQueueStorage.mockHFileRefs(storage, 10);
+    return encodedName2PeerIds;
+  }
+
+  @Test
+  public void testNoPeers() throws Exception {
+    prepareData();
+    for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
+      future.get(1, TimeUnit.MINUTES);
+    }
+    // should have called initializer
+    verify(queueStorageInitializer).initialize();
+    // should have not migrated any data since there is no peer
+    try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName())) {
+      assertEquals(0, HBaseTestingUtil.countRows(table));
+    }
+  }
+
+  @Test
+  public void testMigrate() throws Exception {
+    Map<String, Set<String>> encodedName2PeerIds = prepareData();
+    // add all peers so we will migrate them all
+    for (int i = 0; i < nPeers; i++) {
+      // value is not used in this test, so just add a mock
+      peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
+    }
+    for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
+      future.get(1, TimeUnit.MINUTES);
+    }
+    // should have called initializer
+    verify(queueStorageInitializer).initialize();
+    List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();
+    // there should be two empty queues so minus 2
+    assertEquals(2 * nServers - 2, queueDatas.size());
+    for (ReplicationQueueData queueData : queueDatas) {
+      assertEquals("peer_0", queueData.getId().getPeerId());
+      assertEquals(1, queueData.getOffsets().size());
+      String walGroup = queueData.getId().getServerWALsBelongTo().toString();
+      ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup);
+      assertEquals(0, offset.getOffset());
+      assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal());
+    }
+    // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids
+    try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName());
+      ResultScanner scanner =
+        table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) {
+      for (int i = 0; i < 2; i++) {
+        Result result = scanner.next();
+        String peerId = Bytes.toString(result.getRow());
+        assertEquals(nRegions, result.size());
+        for (Cell cell : result.rawCells()) {
+          String encodedRegionName = Bytes.toString(cell.getQualifierArray(),
+            cell.getQualifierOffset(), cell.getQualifierLength());
+          encodedName2PeerIds.get(encodedRegionName).remove(peerId);
+          long seqId =
+            Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+          assertEquals(i + 1, seqId);
+        }
+      }
+      encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> {
+        assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty());
+      });
+      assertNull(scanner.next());
+    }
+    for (int i = 0; i < nPeers; i++) {
+      List<String> refs = queueStorage.getReplicableHFiles("peer_" + i);
+      assertEquals(i, refs.size());
+      Set<String> refsSet = new HashSet<>(refs);
+      for (int j = 0; j < i; j++) {
+        assertTrue(refsSet.remove("hfile-" + j));
+      }
+      assertThat(refsSet, empty());
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index b6157ac0f18..27477527277 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -216,7 +216,7 @@ public class TestReplicationBase {
     conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
   }
 
-  static void restartSourceCluster(int numSlaves) throws Exception {
+  protected static void restartSourceCluster(int numSlaves) throws Exception {
     Closeables.close(hbaseAdmin, true);
     Closeables.close(htable1, true);
     UTIL1.shutdownMiniHBaseCluster();
diff --git a/pom.xml b/pom.xml
index fd8312e3f28..2c3043ea436 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1028,13 +1028,18 @@
         <artifactId>hbase-hadoop-compat</artifactId>
         <version>${project.version}</version>
         <type>test-jar</type>
-        <scope>test</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-replication</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-replication</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+      </dependency>
       <dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-balancer</artifactId>