You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/18 13:57:31 UTC

[hbase] 08/11: HBASE-27217 Revisit the DumpReplicationQueues tool (#4810)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 96f15b3d2d3fddd0f257c627473c293a95e6505a
Author: LiangJun He <20...@163.com>
AuthorDate: Sun Nov 13 22:03:36 2022 +0800

    HBASE-27217 Revisit the DumpReplicationQueues tool (#4810)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../regionserver/DumpReplicationQueues.java        | 240 +++++++++++++--------
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    |  20 ++
 .../regionserver/TestDumpReplicationQueues.java    | 159 +++++++++-----
 3 files changed, 284 insertions(+), 135 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 98d0a55fbc4..b284e3f6837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -40,28 +44,33 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.io.WALLink;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKDump;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
 
 /**
- * TODO: reimplement this tool
  * <p/>
  * Provides information about the existing states of replication, replication peers and queues.
  * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
  * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
- * usage by the replication queues (note: can be overestimated).
+ * usage by the replication queues (note: can be overestimated). In the new version, we
+ * reimplemented the DumpReplicationQueues tool to support obtaining information from replication
+ * table.
  */
 @InterfaceAudience.Private
 public class DumpReplicationQueues extends Configured implements Tool {
@@ -185,7 +194,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
     System.err.println("General Options:");
     System.err.println(" -h|--h|--help  Show this help and exit.");
     System.err.println(" --distributed  Poll each RS and print its own replication queue. "
-      + "Default only polls ZooKeeper");
+      + "Default only polls replication table.");
     System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
       + " It could be overestimated if replicating to multiple peers."
       + " --distributed flag is also needed.");
@@ -201,13 +210,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
     Connection connection = ConnectionFactory.createConnection(conf);
     Admin admin = connection.getAdmin();
 
-    ZKWatcher zkw =
-      new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
-        new WarnOnlyAbortable(), true);
-
     try {
-      // Our zk watcher
-      LOG.info("Our Quorum: " + zkw.getQuorum());
       List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
       if (replicatedTableCFs.isEmpty()) {
         LOG.info("No tables with a configured replication peer were found.");
@@ -229,21 +232,72 @@ public class DumpReplicationQueues extends Configured implements Tool {
         LOG.info("Found [--distributed], will poll each RegionServer.");
         Set<String> peerIds =
           peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
-        System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
+        System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf));
         System.out.println(dumpReplicationSummary());
       } else {
-        // use ZK instead
-        System.out.print("Dumping replication znodes via ZooKeeper:");
-        System.out.println(ZKDump.getReplicationZnodesDump(zkw));
+        // use replication table instead
+        System.out.println("Dumping replication info via replication table.");
+        System.out.println(dumpReplicationViaTable(connection, conf));
       }
       return (0);
     } catch (IOException e) {
       return (-1);
     } finally {
-      zkw.close();
+      connection.close();
     }
   }
 
+  public String dumpReplicationViaTable(Connection connection, Configuration conf)
+    throws ReplicationException, IOException {
+    StringBuilder sb = new StringBuilder();
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+    // The dump info format is as follows:
+    // peers:
+    // peers/1: zk1:2181:/hbase
+    // peers/1/peer-state: ENABLED
+    // rs:
+    // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
+    // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
+    // hfile-refs:
+    // hfile-refs/1/hfile1,hfile2
+    // hfile-refs/2/hfile3,hfile4
+    String peersKey = "peers";
+    sb.append(peersKey).append(": ").append("\n");
+    List<ReplicationPeerDescription> repPeerDescs = connection.getAdmin().listReplicationPeers();
+    for (ReplicationPeerDescription repPeerDesc : repPeerDescs) {
+      sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ")
+        .append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n");
+      sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ")
+        .append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n");
+    }
+
+    List<ReplicationQueueData> repQueueDataList = queueStorage.listAllQueues();
+    String rsKey = "rs";
+    sb.append(rsKey).append(": ").append("\n");
+    for (ReplicationQueueData repQueueData : repQueueDataList) {
+      String peerId = repQueueData.getId().getPeerId();
+      for (ImmutableMap.Entry<String, ReplicationGroupOffset> entry : repQueueData.getOffsets()
+        .entrySet()) {
+        sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/")
+          .append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset())
+          .append("\n");
+      }
+    }
+
+    List<String> peerIds = queueStorage.getAllPeersFromHFileRefsQueue();
+    String hfileKey = "hfile-refs";
+    sb.append(hfileKey).append(": ").append("\n");
+    for (String peerId : peerIds) {
+      List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
+      sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles))
+        .append("\n");
+    }
+
+    return sb.toString();
+  }
+
   public String dumpReplicationSummary() {
     StringBuilder sb = new StringBuilder();
     if (!deletedQueues.isEmpty()) {
@@ -294,71 +348,103 @@ public class DumpReplicationQueues extends Configured implements Tool {
     return sb.toString();
   }
 
-  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
-    ReplicationQueueStorage queueStorage;
+  public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs,
+    Configuration conf) throws Exception {
     StringBuilder sb = new StringBuilder();
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+    Set<ServerName> liveRegionServers =
+      connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
 
-    // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
-    // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
-    // zkw.getZNodePaths().rsZNode)
-    // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
-    //
-    // Loops each peer on each RS and dumps the queues
-    // List<ServerName> regionservers = queueStorage.getListOfReplicators();
-    // if (regionservers == null || regionservers.isEmpty()) {
-    // return sb.toString();
-    // }
-    // for (ServerName regionserver : regionservers) {
-    // List<String> queueIds = queueStorage.getAllQueues(regionserver);
-    // if (!liveRegionServers.contains(regionserver)) {
-    // deadRegionServers.add(regionserver.getServerName());
-    // }
-    // for (String queueId : queueIds) {
-    // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-    // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
-    // Collections.sort(wals);
-    // if (!peerIds.contains(queueInfo.getPeerId())) {
-    // deletedQueues.add(regionserver + "/" + queueId);
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
-    // } else {
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
-    // }
-    // }
-    // }
+    List<ServerName> regionServers = queueStorage.listAllReplicators();
+    if (regionServers == null || regionServers.isEmpty()) {
+      return sb.toString();
+    }
+    for (ServerName regionServer : regionServers) {
+      List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer);
+
+      if (!liveRegionServers.contains(regionServer)) {
+        deadRegionServers.add(regionServer.getServerName());
+      }
+      for (ReplicationQueueId queueId : queueIds) {
+        List<String> tmpWals = new ArrayList<>();
+        // wals
+        AbstractFSWALProvider
+          .getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream()
+          .map(Path::toString).forEach(tmpWals::add);
+
+        // old wals
+        AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(),
+          queueId.getServerWALsBelongTo(), URLEncoder
+            .encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name()))
+          .stream().map(Path::toString).forEach(tmpWals::add);
+
+        Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
+        // filter out the wal files that should replicate
+        List<String> wals = new ArrayList<>();
+        for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
+          ReplicationGroupOffset offset = entry.getValue();
+          for (String wal : tmpWals) {
+            if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
+              wals.add(wal);
+            }
+          }
+        }
+        Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
+        if (!peerIds.contains(queueId.getPeerId())) {
+          deletedQueues.add(regionServer + "/" + queueId);
+          sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs));
+        } else {
+          sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs));
+        }
+      }
+    }
     return sb.toString();
   }
 
-  private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
-    ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
-    boolean hdfs) throws Exception {
+  private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets,
+    List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
+    throws Exception {
     StringBuilder sb = new StringBuilder();
 
-    List<ServerName> deadServers;
-
-    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
-    sb.append("    Queue znode: " + queueId + "\n");
-    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
-    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
-    deadServers = queueInfo.getDeadRegionServers();
-    if (deadServers.isEmpty()) {
-      sb.append("    No dead RegionServers found in this queue." + "\n");
+    sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
+    sb.append("    Queue id: " + queueId + "\n");
+    sb.append("    PeerID: " + queueId.getPeerId() + "\n");
+    sb.append("    Recovered: " + queueId.isRecovered() + "\n");
+    // In new version, we only record the first dead RegionServer in queueId.
+    if (queueId.getSourceServerName().isPresent()) {
+      sb.append("    Dead RegionServer: " + queueId.getSourceServerName().get() + "\n");
     } else {
-      sb.append("    Dead RegionServers: " + deadServers + "\n");
+      sb.append("    No dead RegionServer found in this queue." + "\n");
     }
     sb.append("    Was deleted: " + isDeleted + "\n");
     sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
-    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
-
-    for (String wal : wals) {
-      // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
-      // sb.append(" Replication position for " + wal + ": "
-      // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
+    peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());
+
+    for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
+      String walGroup = entry.getKey();
+      ReplicationGroupOffset offset = entry.getValue();
+      for (String wal : wals) {
+        long position = 0;
+        if (offset.getWal().equals(wal)) {
+          position = offset.getOffset();
+        }
+        sb.append(
+          " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": ");
+        if (position == 0) {
+          sb.append("0 (not started or nothing to replicate)");
+        } else if (position > 0) {
+          sb.append(position);
+        }
+        sb.append("\n");
+      }
     }
 
     if (hdfs) {
       FileSystem fs = FileSystem.get(getConf());
       sb.append("    Total size of WALs on HDFS for this queue: "
-        + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
+        + StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
     }
     return sb.toString();
   }
@@ -366,8 +452,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
   /**
    * return total size in bytes from a list of WALs
    */
-  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
-    throws IOException {
+  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) {
     long size = 0;
     FileStatus fileStatus;
 
@@ -389,19 +474,4 @@ public class DumpReplicationQueues extends Configured implements Tool {
     totalSizeOfWALs += size;
     return size;
   }
-
-  private static class WarnOnlyAbortable implements Abortable {
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(e.toString(), e);
-      }
-    }
-
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 1b387f33ecc..dce58dbfae4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -379,6 +379,26 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
     return archivedWalFiles;
   }
 
+  /**
+   * List all the wal files for a logPrefix.
+   */
+  public static List<Path> getWALFiles(Configuration c, ServerName serverName) throws IOException {
+    Path walRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
+    FileSystem fs = walRoot.getFileSystem(c);
+    List<Path> walFiles = new ArrayList<>();
+    Path walDir = new Path(walRoot, serverName.toString());
+    try {
+      for (FileStatus status : fs.listStatus(walDir)) {
+        if (status.isFile()) {
+          walFiles.add(status.getPath());
+        }
+      }
+    } catch (FileNotFoundException e) {
+      LOG.info("WAL dir {} not exists", walDir);
+    }
+    return walFiles;
+  }
+
   /**
    * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
    * this method ignores the format of the logfile component. Current format: [base directory for
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
index 3475ae5c192..3e1dc624fe7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
@@ -17,34 +17,43 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 /**
  * Tests for DumpReplicationQueues tool
  */
-// TODO: reimplement
-@Ignore
 @Category({ ReplicationTests.class, SmallTests.class })
 public class TestDumpReplicationQueues {
 
@@ -52,49 +61,99 @@ public class TestDumpReplicationQueues {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestDumpReplicationQueues.class);
 
-  /**
-   * Makes sure dumpQueues returns wals znodes ordered chronologically.
-   * @throws Exception if dumpqueues finds any error while handling list of znodes.
-   */
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+  private static Configuration CONF;
+  private static FileSystem FS = null;
+  private Path root;
+  private Path logDir;
+  @Rule
+  public final TestName name = new TestName();
+
+  @Before
+  public void setup() throws Exception {
+    UTIL.startMiniCluster(3);
+    CONF = UTIL.getConfiguration();
+    TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
+    UTIL.getAdmin()
+      .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
+    CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
+    FS = FileSystem.get(CONF);
+    root = UTIL.getDataTestDirOnTestFS("hbase");
+    logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME);
+    FS.mkdirs(logDir);
+    CommonFSUtils.setRootDir(CONF, root);
+    CommonFSUtils.setWALRootDir(CONF, root);
+  }
+
   @Test
-  public void testDumpReplicationReturnsWalSorted() throws Exception {
-    Configuration config = HBaseConfiguration.create();
-    ZKWatcher zkWatcherMock = mock(ZKWatcher.class);
-    ZNodePaths zNodePath = new ZNodePaths(config);
-    RecoverableZooKeeper recoverableZooKeeperMock = mock(RecoverableZooKeeper.class);
-    when(zkWatcherMock.getRecoverableZooKeeper()).thenReturn(recoverableZooKeeperMock);
-    when(zkWatcherMock.getZNodePaths()).thenReturn(zNodePath);
-    List<String> nodes = new ArrayList<>();
-    String server = "rs1,60030," + EnvironmentEdgeManager.currentTime();
-    nodes.add(server);
-    when(recoverableZooKeeperMock.getChildren("/hbase/rs", null)).thenReturn(nodes);
-    when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs", null)).thenReturn(nodes);
-    List<String> queuesIds = new ArrayList<>();
-    queuesIds.add("1");
-    when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server, null))
-      .thenReturn(queuesIds);
-    List<String> wals = new ArrayList<>();
-    wals.add("rs1%2C60964%2C1549394085556.1549394101427");
-    wals.add("rs1%2C60964%2C1549394085556.1549394101426");
-    wals.add("rs1%2C60964%2C1549394085556.1549394101428");
-    when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server + "/1", null))
-      .thenReturn(wals);
+  public void testDumpReplication() throws Exception {
+    String peerId = "1";
+    String serverNameStr = "rs1,12345,123";
+    addPeer(peerId, "hbase");
+    ServerName serverName = ServerName.valueOf(serverNameStr);
+    String walName = "rs1%2C12345%2C123.10";
+    Path walPath = new Path(logDir, serverNameStr + "/" + walName);
+    FS.createNewFile(walPath);
+
+    ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF);
+    queueStorage.setOffset(queueId, "wal-group",
+      new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123),
+      Collections.emptyMap());
+
     DumpReplicationQueues dumpQueues = new DumpReplicationQueues();
     Set<String> peerIds = new HashSet<>();
-    peerIds.add("1");
-    dumpQueues.setConf(config);
-    String dump = dumpQueues.dumpQueues(zkWatcherMock, peerIds, false);
+    peerIds.add(peerId);
+    List<String> wals = new ArrayList<>();
+    wals.add("rs1%2C12345%2C123.12");
+    wals.add("rs1%2C12345%2C123.15");
+    wals.add("rs1%2C12345%2C123.11");
+    for (String wal : wals) {
+      Path wPath = new Path(logDir, serverNameStr + "/" + wal);
+      FS.createNewFile(wPath);
+    }
+
+    String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF);
+    assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0);
+    assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0);
+    // test for 'Returns wal sorted'
     String[] parsedDump = dump.split("Replication position for");
-    assertEquals("Parsed dump should have 4 parts.", 4, parsedDump.length);
-    assertTrue(
-      "First wal should be rs1%2C60964%2C1549394085556.1549394101426, but got: " + parsedDump[1],
-      parsedDump[1].indexOf("rs1%2C60964%2C1549394085556.1549394101426") >= 0);
-    assertTrue(
-      "Second wal should be rs1%2C60964%2C1549394085556.1549394101427, but got: " + parsedDump[2],
-      parsedDump[2].indexOf("rs1%2C60964%2C1549394085556.1549394101427") >= 0);
-    assertTrue(
-      "Third wal should be rs1%2C60964%2C1549394085556.1549394101428, but got: " + parsedDump[3],
-      parsedDump[3].indexOf("rs1%2C60964%2C1549394085556.1549394101428") >= 0);
+    assertTrue("First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1],
+      parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0);
+    assertTrue("Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2],
+      parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0);
+    assertTrue("Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3],
+      parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0);
+    assertTrue("Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4],
+      parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0);
+
+    Path file1 = new Path("testHFile1");
+    Path file2 = new Path("testHFile2");
+    List<Pair<Path, Path>> files = new ArrayList<>(1);
+    files.add(new Pair<>(null, file1));
+    files.add(new Pair<>(null, file2));
+    queueStorage.addHFileRefs(peerId, files);
+    // test for 'Dump Replication via replication table'
+    String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF);
+    assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0);
+    assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0);
+    assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0);
+  }
+
+  /**
+   * Add a peer
+   */
+  private void addPeer(String peerId, String clusterKey) throws IOException {
+    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
+      .setReplicationEndpointImpl(
+        TestReplicationSourceManager.ReplicationEndpointForTest.class.getName());
+    UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true);
   }
 
+  @After
+  public void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
 }