You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/01/22 02:00:36 UTC

[hbase] 01/02: HBASE-21680 Port HBASE-20194 (Basic Replication WebUI - Master) and HBASE-20193 (Basic Replication Web UI - Regionserver) to branch-1

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1fd76bd35a8f69cc21e175c28967ea4c4bad4584
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Fri Jan 18 14:58:34 2019 -0800

    HBASE-21680 Port HBASE-20194 (Basic Replication WebUI - Master) and HBASE-20193 (Basic Replication Web UI - Regionserver) to branch-1
    
    HBASE-20193 Basic Replication Web UI - Regionserver
    
    HBASE-20194 Basic Replication WebUI - Master
---
 .../hbase/tmpl/master/MasterStatusTmpl.jamon       |  42 +++++++
 .../hbase/tmpl/master/RegionServerListTmpl.jamon   |  86 +++++++++++--
 .../hbase/tmpl/regionserver/RSStatusTmpl.jamon     |   4 +
 .../tmpl/regionserver/ReplicationStatusTmpl.jamon  | 105 ++++++++++++++++
 .../org/apache/hadoop/hbase/master/HMaster.java    |  21 ++++
 .../hadoop/hbase/regionserver/HRegionServer.java   |  16 +++
 .../hbase/regionserver/ReplicationService.java     |   2 +-
 .../regionserver/ReplicationSourceService.java     |   6 +
 .../replication/regionserver/MetricsSource.java    |  37 ++++--
 .../replication/regionserver/ReplicationLoad.java  |  33 +++--
 .../regionserver/ReplicationSource.java            |  37 ++++++
 .../regionserver/ReplicationSourceInterface.java   |   6 +
 .../ReplicationSourceWALReaderThread.java          |  11 +-
 .../regionserver/ReplicationStatus.java            | 135 +++++++++++++++++++++
 .../hbase/master/TestGetReplicationLoad.java       | 133 ++++++++++++++++++++
 .../hbase/replication/ReplicationSourceDummy.java  |   8 ++
 .../hbase/replication/TestReplicationBase.java     |   1 +
 .../replication/TestReplicationMetricsforUI.java   | 105 ++++++++++++++++
 18 files changed, 750 insertions(+), 38 deletions(-)

diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
index efff6f7..f365221 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
@@ -43,6 +43,7 @@ org.apache.hadoop.hbase.ServerLoad;
 org.apache.hadoop.hbase.ServerName;
 org.apache.hadoop.hbase.client.Admin;
 org.apache.hadoop.hbase.client.HConnectionManager;
+org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 org.apache.hadoop.hbase.HRegionInfo;
 org.apache.hadoop.hbase.master.RegionState;
 org.apache.hadoop.hbase.HTableDescriptor;
@@ -52,6 +53,9 @@ org.apache.hadoop.hbase.tool.Canary;
 org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 org.apache.hadoop.hbase.master.DeadServer;
 org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+org.apache.hadoop.hbase.replication.ReplicationPeer;
+org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
 org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
 org.apache.hadoop.hbase.security.access.AccessControlLists;
 org.apache.hadoop.hbase.quotas.QuotaUtil;
@@ -232,6 +236,10 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
                 </div>
             </div>
         </section>
+        <section>
+            <h2>Peers</h2>
+            <& peerConfigs &>
+        </section>
         <%if master.getAssignmentManager() != null %>
         <& AssignmentManagerStatusTmpl; assignmentManager=master.getAssignmentManager()&>
         </%if>
@@ -554,3 +562,37 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
 </table>
 </%if>
 </%def>
+
+<%def peerConfigs>
+<%java>
+    Map<String, ReplicationPeerConfig> peers = null;
+    try (ReplicationAdmin admin = new ReplicationAdmin(master.getConfiguration())) {
+        peers = admin.listPeerConfigs();
+    }
+</%java>
+<table class="table table-striped">
+    <tr>
+        <th>Peer Id</th>
+        <th>Cluster Key</th>
+        <th>Bandwidth</th>
+        <th>Table Cfs</th>
+    </tr>
+<%if (peers != null && peers.size() > 0)%>
+    <%for Map.Entry<String, ReplicationPeerConfig> peer : peers.entrySet() %>
+    <%java>
+        String peerId = peer.getKey();
+        ReplicationPeerConfig peerConfig = peer.getValue();
+    </%java>
+    <tr>
+        <td><% peerId %></td>
+        <td><% peerConfig.getClusterKey() %></td>
+        <td><% peerConfig.getBandwidth() == 0? "UNLIMITED" : StringUtils.humanReadableInt(peerConfig.getBandwidth()) %></td>
+        <td>
+           <% peerConfig.getTableCFsMap() == null ? "" : ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap()).replaceAll(";", "; ") %>
+        </td>
+    </tr>
+    </%for>
+</%if>
+<tr><td>Total: <% (peers != null) ? peers.size() : 0 %></td></tr>
+</table>
+</%def>
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
index 29c0f77..8116b28 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
@@ -26,6 +26,8 @@ HMaster master;
 <%import>
         java.util.*;
         org.apache.hadoop.hbase.master.HMaster;
+        org.apache.hadoop.hbase.procedure2.util.StringUtils;
+        org.apache.hadoop.hbase.replication.ReplicationLoadSource;
         org.apache.hadoop.hbase.ServerLoad;
         org.apache.hadoop.hbase.ServerName;
         org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -33,6 +35,7 @@ HMaster master;
         org.apache.hadoop.hbase.HTableDescriptor;
         org.apache.hadoop.hbase.HBaseConfiguration;
         org.apache.hadoop.hbase.util.VersionInfo;
+        org.apache.hadoop.hbase.util.Pair;
         org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 </%import>
 
@@ -50,7 +53,8 @@ Arrays.sort(serverNames);
         <li class=""><a href="#tab_memoryStats" data-toggle="tab">Memory</a></li>
         <li class=""><a href="#tab_requestStats" data-toggle="tab">Requests</a></li>
         <li class=""><a href="#tab_storeStats" data-toggle="tab">Storefiles</a></li>
-        <li class=""><a href="#tab_compactStas" data-toggle="tab">Compactions</a></li>
+        <li class=""><a href="#tab_compactStats" data-toggle="tab">Compactions</a></li>
+        <li class=""><a href="#tab_replicationStats" data-toggle="tab">Replications</a></li>
     </ul>
     <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
         <div class="tab-pane active" id="tab_baseStats">
@@ -65,9 +69,12 @@ Arrays.sort(serverNames);
         <div class="tab-pane" id="tab_storeStats">
             <& storeStats; serverNames = serverNames; &>
         </div>
-        <div class="tab-pane" id="tab_compactStas">
+        <div class="tab-pane" id="tab_compactStats">
             <& compactionStats; serverNames = serverNames; &>
         </div>
+        <div class="tab-pane" id="tab_replicationStats">
+            <& replicationStats; serverNames = serverNames; &>
+        </div>
     </div>
 </div>
 
@@ -117,7 +124,7 @@ Arrays.sort(serverNames);
     long startcode = serverName.getStartcode();
 </%java>
 <tr>
-    <td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+    <td><& serverNameLink; serverName=serverName; &></td>
     <td><% new Date(startcode) %></td>
     <td><% TraditionalBinaryPrefix.long2String(lastContact, "s", 1) %></td>
     <td><% version %></td>
@@ -164,7 +171,7 @@ for (ServerName serverName: serverNames) {
     if (sl != null) {
 </%java>
 <tr>
-    <td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+    <td><& serverNameLink; serverName=serverName; &></td>
     <td><% TraditionalBinaryPrefix.long2String(sl.getUsedHeapMB()
       * TraditionalBinaryPrefix.MEGA.value, "B", 1) %></td>
     <td><% TraditionalBinaryPrefix.long2String(sl.getMaxHeapMB()
@@ -207,7 +214,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName);
 if (sl != null) {
 </%java>
 <tr>
-<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+<td><& serverNameLink; serverName=serverName; &></td>
 <td><% String.format("%.0f", sl.getRequestsPerSecond()) %></td>
 <td><% sl.getReadRequestsCount() %></td>
 <td><% sl.getWriteRequestsCount() %></td>
@@ -249,7 +256,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName);
 if (sl != null) {
 </%java>
 <tr>
-<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+<td><& serverNameLink; serverName=serverName; &></td>
 <td><% sl.getStores() %></td>
 <td><% sl.getStorefiles() %></td>
 <td><% TraditionalBinaryPrefix.long2String(
@@ -300,7 +307,7 @@ if  (sl.getTotalCompactingKVs() > 0) {
 }
 </%java>
 <tr>
-<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+<td><& serverNameLink; serverName=serverName; &></td>
 <td><% sl.getTotalCompactingKVs() %></td>
 <td><% sl.getCurrentCompactedKVs() %></td>
 <td><% sl.getTotalCompactingKVs() - sl.getCurrentCompactedKVs() %></td>
@@ -318,11 +325,72 @@ if  (sl.getTotalCompactingKVs() > 0) {
 </table>
 </%def>
 
+<%def replicationStats>
+<%args>
+    ServerName [] serverNames;
+</%args>
+<%java>
+        HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap
+            = master.getReplicationLoad(serverNames);
+        List<String> peers = null;
+        if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0){
+            peers = new ArrayList<>(replicationLoadSourceMap.keySet());
+            Collections.sort(peers);
+        }
+</%java>
+
+<%if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0) %>
+
+<div class="tabbable">
+    <ul class="nav nav-tabs">
+        <%java>
+        String active = "active";
+        for (String peer : peers){
+        </%java>
+            <li class=<% active %>><a href="#tab_<% peer %>" data-toggle="tab">Peer <% peer %></a> </li>
+        <%java>
+        active = "";
+        }
+        </%java>
+    </ul>
+    <div class="tab-content">
+        <%java>
+            active = "active";
+            for (String peer : peers){
+        </%java>
+            <div class="tab-pane <% active %>" id="tab_<% peer %>">
+                <table class="table table-striped">
+                    <tr>
+                        <th>Server</th>
+                        <th>AgeOfLastShippedOp</th>
+                        <th>SizeOfLogQueue</th>
+                        <th>ReplicationLag</th>
+                    </tr>
+
+                    <%for Pair<ServerName, ReplicationLoadSource> pair: replicationLoadSourceMap.get(peer) %>
+                    <tr>
+                        <td><& serverNameLink; serverName=pair.getFirst(); &></td>
+                        <td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
+                        <td><% pair.getSecond().getSizeOfLogQueue() %></td>
+                        <td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
+                    </tr>
+                    </%for>
+        </table>
+            </div>
+        <%java>
+                active = "";
+                }
+        </%java>
+    </div>
+</div>
+<%else>
+    <p>No Peers Metrics</p>
+</%if>
+</%def>
 
 <%def serverNameLink>
         <%args>
         ServerName serverName;
-        ServerLoad serverLoad;
         </%args>
         <%java>
         int infoPort = master.getRegionServerInfoPort(serverName);
@@ -341,7 +409,7 @@ if  (sl.getTotalCompactingKVs() > 0) {
         ServerName serverName;
         </%args>
     <tr>
-    <td><& serverNameLink; serverName=serverName; serverLoad = null; &></td>
+    <td><& serverNameLink; serverName=serverName; &></td>
     <td></td>
     <td></td>
     <td></td>
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
index a8d4003..bc3bbd7 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
@@ -122,6 +122,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
     <& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
     </section>
 
+    <section>
+    <h2>Replication Status</h1>
+    <& ReplicationStatusTmpl; regionServer = regionServer; &>
+    </section>
 
     <section>
     <h2>Software Attributes</h2>
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
new file mode 100644
index 0000000..7dc1c7f
--- /dev/null
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
@@ -0,0 +1,105 @@
+<%doc>
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+</%doc>
+<%args>
+        HRegionServer regionServer;
+</%args>
+<%import>
+        java.util.*;
+        java.util.Map.Entry;
+        org.apache.hadoop.hbase.procedure2.util.StringUtils;
+        org.apache.hadoop.hbase.regionserver.HRegionServer;
+        org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+</%import>
+
+<%java>
+        Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
+</%java>
+
+<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
+
+    <div class="tabbable">
+        <ul class="nav nav-pills">
+            <li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
+            <li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
+        </ul>
+        <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
+            <div class="tab-pane active" id="tab_currentLog">
+                <& currentLog; metrics = walGroupsReplicationStatus; &>
+            </div>
+            <div class="tab-pane" id="tab_replicationDelay">
+                <& replicationDelay; metrics = walGroupsReplicationStatus; &>
+            </div>
+        </div>
+    </div>
+    <p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
+    If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
+
+<%else>
+    <p>No Replication Metrics for Peers</p>
+</%if>
+
+<%def currentLog>
+<%args>
+    Map<String, ReplicationStatus> metrics;
+</%args>
+    <table class="table table-striped">
+        <tr>
+            <th>PeerId</th>
+            <th>WalGroup</th>
+            <th>Current Log</th>
+            <th>Size</th>
+            <th>Queue Size</th>
+            <th>Offset</th>
+        </tr>
+            <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+                 <tr>
+                     <td><% entry.getValue().getPeerId() %></td>
+                     <td><% entry.getValue().getWalGroup() %></td>
+                     <td><% entry.getValue().getCurrentPath() %> </td>
+                     <td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
+                     <td><% entry.getValue().getQueueSize() %></td>
+                     <td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
+                 </tr>
+            </%for>
+    </table>
+</%def>
+
+<%def replicationDelay>
+<%args>
+    Map<String, ReplicationStatus> metrics;
+</%args>
+    <table class="table table-striped">
+        <tr>
+            <th>PeerId</th>
+            <th>WalGroup</th>
+            <th>Current Log</th>
+            <th>Last Shipped Age</th>
+            <th>Replication Delay</th>
+        </tr>
+            <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+                 <tr>
+                     <td><% entry.getValue().getPeerId() %></td>
+                     <td><% entry.getValue().getWalGroup() %></td>
+                     <td><% entry.getValue().getCurrentPath() %> </td>
+                     <td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
+                     <td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
+                 </tr>
+            </%for>
+    </table>
+</%def>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0d0b5c1..c1405b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
+import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.security.User;
@@ -3292,4 +3293,24 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
   public LoadBalancer getLoadBalancer() {
     return balancer;
   }
+
+  public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
+    getReplicationLoad(ServerName[] serverNames) {
+    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
+        new HashMap<>();
+    for (ServerName serverName : serverNames) {
+      List<ReplicationLoadSource> replicationLoadSources =
+        getServerManager().getLoad(serverName).getReplicationLoadSourceList();
+      for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
+        List<Pair<ServerName, ReplicationLoadSource>> list =
+          replicationLoadSourceMap.get(replicationLoadSource.getPeerID());
+        if (list == null) {
+          list = new ArrayList<Pair<ServerName, ReplicationLoadSource>>();
+          replicationLoadSourceMap.put(replicationLoadSource.getPeerID(), list);
+        }
+        list.add(new Pair<>(serverName, replicationLoadSource));
+      }
+    }
+    return replicationLoadSourceMap;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f7737de..258f68e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -164,6 +164,8 @@ import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -2832,6 +2834,20 @@ public class HRegionServer extends HasThread implements
     return service;
   }
 
+  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
+    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
+    if(!this.isOnline()){
+      return walGroupsReplicationStatus;
+    }
+    List<ReplicationSourceInterface> allSources = new ArrayList<>();
+    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
+    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
+    for(ReplicationSourceInterface source: allSources){
+      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
+    }
+    return walGroupsReplicationStatus;
+  }
+
   /**
    * Utility for constructing an instance of the passed HRegionServer class.
    *
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 25a27a9..3a0ed85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -57,5 +57,5 @@ public interface ReplicationService {
   /**
    * Refresh and Get ReplicationLoad
    */
-  public ReplicationLoad refreshAndGetReplicationLoad();
+  ReplicationLoad refreshAndGetReplicationLoad();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 13b502b..93f98f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 
 /**
  * A source for a replication stream has to expose this service.
@@ -33,4 +34,9 @@ public interface ReplicationSourceService extends ReplicationService {
    * observe log rolls and log archival events.
    */
   WALActionsListener getWALActionsListener();
+
+  /**
+   * Returns the replication manager
+   */
+  ReplicationSourceManager getReplicationManager();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 53e1074..5503632 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -37,10 +35,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 public class MetricsSource implements BaseSource {
 
-  private static final Log LOG = LogFactory.getLog(MetricsSource.class);
-
   // tracks last shipped timestamp for each wal group
-  private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
+  private Map<String, Long> lastTimestamps = new HashMap<>();
+  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
   private long lastHFileRefsQueueSize = 0;
   private String id;
 
@@ -87,7 +84,8 @@ public class MetricsSource implements BaseSource {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     singleSourceSource.setLastShippedAge(age);
     globalSourceSource.setLastShippedAge(age);
-    this.lastTimeStamps.put(walGroup, timestamp);
+    this.ageOfLastShippedOp.put(walGroup, age);
+    this.lastTimestamps.put(walGroup, timestamp);
   }
 
   /**
@@ -105,14 +103,32 @@ public class MetricsSource implements BaseSource {
     this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
   }
   /**
+   * get the last timestamp of given wal group. If the walGroup is null, return 0.
+   * @param walGroup which group we are getting
+   * @return timeStamp
+   */
+  public long getLastTimeStampOfWalGroup(String walGroup) {
+    return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
+  }
+
+  /**
+   * get age of last shipped op of given wal group. If the walGroup is null, return 0
+   * @param walGroup which group we are getting
+   * @return age
+   */
+  public long getAgeofLastShippedOp(String walGroup) {
+    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
+  }
+
+  /**
    * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
    * when replication fails and need to keep that metric accurate.
    * @param walGroupId id of the group to update
    */
   public void refreshAgeOfLastShippedOp(String walGroupId) {
-    Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
+    Long lastTimestamp = this.lastTimestamps.get(walGroupId);
     if (lastTimestamp == null) {
-      this.lastTimeStamps.put(walGroupId, 0L);
+      this.lastTimestamps.put(walGroupId, 0L);
       lastTimestamp = 0L;
     }
     if (lastTimestamp > 0) {
@@ -204,7 +220,8 @@ public class MetricsSource implements BaseSource {
     singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
     singleSourceSource.clear();
     globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
-    lastTimeStamps.clear();
+    lastTimestamps.clear();
+    ageOfLastShippedOp.clear();
     lastHFileRefsQueueSize = 0;
   }
 
@@ -230,7 +247,7 @@ public class MetricsSource implements BaseSource {
    */
   public long getTimeStampOfLastShippedOp() {
     long lastTimestamp = 0L;
-    for (long ts : lastTimeStamps.values()) {
+    for (long ts : lastTimestamps.values()) {
       if (ts > lastTimestamp) {
         lastTimestamp = ts;
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 2ead3df..51fbc93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -79,19 +79,8 @@ public class ReplicationLoad {
       long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
       int sizeOfLogQueue = sm.getSizeOfLogQueue();
       long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
-      long replicationLag;
-      long timePassedAfterLastShippedOp =
-          EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
-      if (sizeOfLogQueue != 0) {
-        // err on the large side
-        replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
-      } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
-        replicationLag = ageOfLastShippedOp; // last shipped happen recently
-      } else {
-        // last shipped may happen last night,
-        // so NO real lag although ageOfLastShippedOp is non-zero
-        replicationLag = 0;
-      }
+      long replicationLag =
+          calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
 
       ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
       if (rLoadSource != null) {
@@ -116,6 +105,24 @@ public class ReplicationLoad {
         replicationLoadSourceMap.values());
   }
 
+  static long calculateReplicationDelay(long ageOfLastShippedOp,
+      long timeStampOfLastShippedOp, int sizeOfLogQueue) {
+    long replicationLag;
+    long timePassedAfterLastShippedOp =
+        EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
+    if (sizeOfLogQueue > 1) {
+      // err on the large side
+      replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
+    } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+      replicationLag = ageOfLastShippedOp; // last shipped happen recently
+    } else {
+      // last shipped may happen last night,
+      // so NO real lag although ageOfLastShippedOp is non-zero
+      replicationLag = 0;
+    }
+    return replicationLag;
+  }
+
   /**
    * sourceToString
    * @return a string contains sourceReplicationLoad information
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2396655..cae7c21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -501,6 +502,38 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
   }
 
+  @Override
+  public Map<String, ReplicationStatus> getWalGroupStatus() {
+    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
+    long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+    for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+      String walGroupId = worker.getWalGroupId();
+      lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
+      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+      int queueSize = queues.get(walGroupId).size();
+      replicationDelay =
+          ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
+      Path currentPath = worker.getCurrentPath();
+      try {
+        fileSize = fs.getContentSummary(currentPath).getLength();
+      } catch (IOException e) {
+        fileSize = -1;
+      }
+      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
+      statusBuilder.withPeerId(this.getPeerClusterId())
+          .withQueueSize(queueSize)
+          .withWalGroup(walGroupId)
+          .withCurrentPath(currentPath)
+          .withCurrentPosition(worker.getCurrentPosition())
+          .withFileSize(fileSize)
+          .withAgeOfLastShippedOp(ageOfLastShippedOp)
+          .withReplicationDelay(replicationDelay);
+      sourceReplicationStatus.put(this.getPeerClusterId() + "=>" + walGroupId,
+        statusBuilder.build());
+    }
+    return sourceReplicationStatus;
+  }
+
   // This thread reads entries from a queue and ships them.
   // Entries are placed onto the queue by ReplicationSourceWALReaderThread
   public class ReplicationSourceShipperThread extends Thread {
@@ -525,6 +558,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
       this.source = source;
     }
 
+    public String getWalGroupId() {
+      return walGroupId;
+    }
+
     @Override
     public void run() {
       setWorkerState(WorkerState.RUNNING);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index e7569ed..8bfbca0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -127,4 +128,9 @@ public interface ReplicationSourceInterface {
    */
   MetricsSource getSourceMetrics();
 
+  /**
+   * get the stat of replication for each wal group.
+   * @return stat of replication
+   */
+  Map<String, ReplicationStatus> getWalGroupStatus();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 03fe229..ec5e862 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -251,7 +252,11 @@ public class ReplicationSourceWALReaderThread extends Thread {
     return entryBatchQueue.take();
   }
 
-  public long getEntrySizeIncludeBulkLoad(Entry entry) {
+  public WALEntryBatch poll(long timeout) throws InterruptedException {
+    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
+  }
+
+  private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
     WALKey key = entry.getKey();
     return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
@@ -470,9 +475,5 @@ public class ReplicationSourceWALReaderThread extends Thread {
     private void incrementHeapSize(long increment) {
       heapSize += increment;
     }
-
-    private void setLastPosition(String region, Long sequenceId) {
-      getLastSeqIds().put(region, sequenceId);
-    }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
new file mode 100644
index 0000000..41076cc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationStatus {
+  private final String peerId;
+  private final String walGroup;
+  private final Path currentPath;
+  private final int queueSize;
+  private final long ageOfLastShippedOp;
+  private final long replicationDelay;
+  private final long currentPosition;
+  private final long fileSize;
+
+  private ReplicationStatus(ReplicationStatusBuilder builder) {
+    this.peerId = builder.peerId;
+    this.walGroup = builder.walGroup;
+    this.currentPath = builder.currentPath;
+    this.queueSize = builder.queueSize;
+    this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
+    this.replicationDelay = builder.replicationDelay;
+    this.currentPosition = builder.currentPosition;
+    this.fileSize = builder.fileSize;
+  }
+
+  public long getCurrentPosition() {
+    return currentPosition;
+  }
+
+  public long getFileSize() {
+    return fileSize;
+  }
+
+  public String getPeerId() {
+    return peerId;
+  }
+
+  public String getWalGroup() {
+    return walGroup;
+  }
+
+  public int getQueueSize() {
+    return queueSize;
+  }
+
+  public long getAgeOfLastShippedOp() {
+    return ageOfLastShippedOp;
+  }
+
+  public long getReplicationDelay() {
+    return replicationDelay;
+  }
+
+  public Path getCurrentPath() {
+    return currentPath;
+  }
+
+  public static ReplicationStatusBuilder newBuilder() {
+    return new ReplicationStatusBuilder();
+  }
+
+  public static class ReplicationStatusBuilder {
+    private String peerId = "UNKNOWN";
+    private String walGroup = "UNKNOWN";
+    private Path currentPath = new Path("UNKNOWN");
+    private int queueSize = -1;
+    private long ageOfLastShippedOp = -1;
+    private long replicationDelay = -1;
+    private long currentPosition = -1;
+    private long fileSize = -1;
+
+    public ReplicationStatusBuilder withPeerId(String peerId) {
+      this.peerId = peerId;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withFileSize(long fileSize) {
+      this.fileSize = fileSize;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withWalGroup(String walGroup) {
+      this.walGroup = walGroup;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
+      this.currentPath = currentPath;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withQueueSize(int queueSize) {
+      this.queueSize = queueSize;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
+      this.ageOfLastShippedOp = ageOfLastShippedOp;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
+      this.replicationDelay = replicationDelay;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
+      this.currentPosition = currentPosition;
+      return this;
+    }
+
+    public ReplicationStatus build() {
+      return new ReplicationStatus(this);
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
new file mode 100644
index 0000000..bad92b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
+import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestGetReplicationLoad {
+  private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class);
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static MiniHBaseCluster cluster;
+  private static HMaster master;
+  private static ReplicationAdmin admin;
+
+  private static final String ID_1 = "1";
+  private static final String ID_2 = "2";
+  private static final String KEY_1 = "127.0.0.1:2181:/hbase";
+  private static final String KEY_2 = "127.0.0.1:2181:/hbase2";
+
+  public static class MyMaster extends HMaster {
+    public MyMaster(Configuration conf, CoordinatedStateManager csm)
+        throws IOException, KeeperException, InterruptedException {
+      super(conf, csm);
+    }
+
+    @Override
+    protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
+      // do nothing
+    }
+  }
+
+  @BeforeClass
+  public static void startCluster() throws Exception {
+    LOG.info("Starting cluster");
+    TEST_UTIL = new HBaseTestingUtility();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
+    TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null);
+    cluster = TEST_UTIL.getHBaseCluster();
+    LOG.info("Waiting for active/ready master");
+    cluster.waitForActiveAndReadyMaster();
+    master = cluster.getMaster();
+    admin = new ReplicationAdmin(conf);
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    if (admin != null) {
+      admin.close();
+    }
+    if (TEST_UTIL != null) {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testGetReplicationMetrics() throws Exception {
+    String peer1 = "test1", peer2 = "test2";
+    long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4;
+    int sizeOfLogQueue = 5;
+    RegionServerStatusProtos.RegionServerReportRequest.Builder request =
+        RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
+    ServerName serverName = cluster.getMaster(0).getServerName();
+    request.setServer(ProtobufUtil.toServerName(serverName));
+    ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource
+        .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
+        .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
+        .setSizeOfLogQueue(sizeOfLogQueue).build();
+    ClusterStatusProtos.ReplicationLoadSource rload2 =
+        ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
+            .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1)
+            .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
+            .setSizeOfLogQueue(sizeOfLogQueue + 1).build();
+    ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
+        .addReplLoadSource(rload1).addReplLoadSource(rload2).build();
+    request.setLoad(sl);
+
+    ReplicationPeerConfig peerConfig_1 = new ReplicationPeerConfig();
+    peerConfig_1.setClusterKey(KEY_1);
+    ReplicationPeerConfig peerConfig_2 = new ReplicationPeerConfig();
+    peerConfig_2.setClusterKey(KEY_2);
+    admin.addPeer(ID_1, peerConfig_1);
+    admin.addPeer(ID_2, peerConfig_2);
+
+    master.getMasterRpcServices().regionServerReport(null, request.build());
+    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad =
+        master.getReplicationLoad(new ServerName[] { serverName });
+    assertEquals("peer size ", 2, replicationLoad.size());
+    assertEquals("load size ", 1, replicationLoad.get(peer1).size());
+    assertEquals("log queue size of peer1", sizeOfLogQueue,
+      replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue());
+    assertEquals("replication lag of peer2", replicationLag + 1,
+      replicationLoad.get(peer2).get(0).getSecond().getReplicationLag());
+
+    master.stopMaster();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index ad8c52f..a5c61f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -106,4 +109,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   public MetricsSource getSourceMetrics() {
     return metrics;
   }
+
+  @Override
+  public Map<String, ReplicationStatus> getWalGroupStatus() {
+    return new HashMap<>();
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index d0f40a6..ddd3195 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -77,6 +77,7 @@ public class TestReplicationBase {
   protected static final byte[] famName = Bytes.toBytes("f");
   protected static final byte[] row = Bytes.toBytes("row");
   protected static final byte[] noRepfamName = Bytes.toBytes("norep");
+  protected static final String PEER_ID2 = "2";
 
   /**
    * @throws java.lang.Exception
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
new file mode 100644
index 0000000..d507b76
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationMetricsforUI extends TestReplicationBase {
+
+  private static final byte[] qualName = Bytes.toBytes("q");
+
+  @Test
+  public void testReplicationMetrics() throws Exception {
+    try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+      Put p = new Put(Bytes.toBytes("starter"));
+      p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+      htable1.put(p);
+      // make sure replication done
+      while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
+        Thread.sleep(500);
+      }
+      // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+      Thread.sleep(5000);
+      HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
+      Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
+      Assert.assertEquals("metric size ", 1, metrics.size());
+      for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+        Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+        Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
+        Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+        long pos = metric.getValue().getCurrentPosition();
+        // Semantics are a bit different in branch-1: If not started, pos will be -1
+        if (pos == -1) {
+          pos = 0;
+        }
+        Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
+      }
+      for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+        p = new Put(Bytes.toBytes("" + Integer.toString(i)));
+        p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
+        htable1.put(p);
+      }
+      while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
+          .size() == 0) {
+        Thread.sleep(500);
+      }
+      rs = utility1.getRSForFirstRegionInTable(tableName);
+      metrics = rs.getWalGroupsReplicationStatus();
+      Path lastPath = null;
+      for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+        lastPath = metric.getValue().getCurrentPath();
+        Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+        Assert.assertTrue("age of Last Shipped Op should be > 0 ",
+          metric.getValue().getAgeOfLastShippedOp() > 0);
+        long pos = metric.getValue().getCurrentPosition();
+        Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
+      }
+
+      hbaseAdmin.rollWALWriter(rs.getServerName());
+      p = new Put(Bytes.toBytes("trigger"));
+      p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+      htable1.put(p);
+      // make sure replication rolled to a new log
+      while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
+        Thread.sleep(500);
+      }
+      // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+      Thread.sleep(5000);
+      metrics = rs.getWalGroupsReplicationStatus();
+      for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+        Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+        long pos = metric.getValue().getCurrentPosition();
+        Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
+        Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
+      }
+    }
+  }
+}
\ No newline at end of file