You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/07/03 08:20:50 UTC

hbase git commit: HBASE-20193 Basic Replication Web UI - Regionserver

Repository: hbase
Updated Branches:
  refs/heads/master 380350d5b -> 66ad9fdef


HBASE-20193 Basic Replication Web UI - Regionserver

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66ad9fde
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66ad9fde
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66ad9fde

Branch: refs/heads/master
Commit: 66ad9fdef8e8bff3ed2b77cd8865f724b345bd18
Parents: 380350d
Author: jingyuntian <ti...@gmail.com>
Authored: Tue Jul 3 11:46:14 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Jul 3 15:47:14 2018 +0800

----------------------------------------------------------------------
 .../tmpl/master/RegionServerListTmpl.jamon      |   3 +-
 .../hbase/tmpl/regionserver/RSStatusTmpl.jamon  |   4 +
 .../regionserver/ReplicationStatusTmpl.jamon    | 105 +++++++++++++++
 .../hbase/regionserver/HRegionServer.java       |  16 +++
 .../hbase/regionserver/ReplicationService.java  |   2 +-
 .../regionserver/ReplicationSourceService.java  |   6 +
 .../replication/regionserver/MetricsSource.java |  20 +++
 .../regionserver/ReplicationLoad.java           |  38 ++++--
 .../regionserver/ReplicationSource.java         |  48 +++++++
 .../ReplicationSourceInterface.java             |  10 ++
 .../regionserver/ReplicationSourceShipper.java  |  12 +-
 .../ReplicationSourceWALReader.java             |   5 +
 .../regionserver/ReplicationStatus.java         | 135 +++++++++++++++++++
 .../TestReplicationMetricsforUI.java            | 107 +++++++++++++++
 14 files changed, 495 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
index c6c7fc3..67d3305 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
@@ -383,7 +383,7 @@ if  (totalCompactingCells > 0) {
                         <td><& serverNameLink; serverName=pair.getFirst(); &></td>
                         <td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
                         <td><% pair.getSecond().getSizeOfLogQueue() %></td>
-                        <td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
+                        <td><% pair.getSecond().getReplicationLag() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
                     </tr>
                     </%for>
         </table>
@@ -393,6 +393,7 @@ if  (totalCompactingCells > 0) {
                 }
         </%java>
     </div>
+    <p>If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.</p>
 </div>
 <%else>
     <p>No Peers Metrics</p>

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
index c9bfcc9..646d835 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
@@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
     <& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
     </section>
 
+    <section>
+    <h2>Replication Status</h1>
+    <& ReplicationStatusTmpl; regionServer = regionServer; &>
+    </section>
 
     <section>
     <h2>Software Attributes</h2>

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
new file mode 100644
index 0000000..7dc1c7f
--- /dev/null
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
@@ -0,0 +1,105 @@
+<%doc>
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+</%doc>
+<%args>
+        HRegionServer regionServer;
+</%args>
+<%import>
+        java.util.*;
+        java.util.Map.Entry;
+        org.apache.hadoop.hbase.procedure2.util.StringUtils;
+        org.apache.hadoop.hbase.regionserver.HRegionServer;
+        org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+</%import>
+
+<%java>
+        Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
+</%java>
+
+<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
+
+    <div class="tabbable">
+        <ul class="nav nav-pills">
+            <li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
+            <li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
+        </ul>
+        <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
+            <div class="tab-pane active" id="tab_currentLog">
+                <& currentLog; metrics = walGroupsReplicationStatus; &>
+            </div>
+            <div class="tab-pane" id="tab_replicationDelay">
+                <& replicationDelay; metrics = walGroupsReplicationStatus; &>
+            </div>
+        </div>
+    </div>
+    <p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
+    If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
+
+<%else>
+    <p>No Replication Metrics for Peers</p>
+</%if>
+
+<%def currentLog>
+<%args>
+    Map<String, ReplicationStatus> metrics;
+</%args>
+    <table class="table table-striped">
+        <tr>
+            <th>PeerId</th>
+            <th>WalGroup</th>
+            <th>Current Log</th>
+            <th>Size</th>
+            <th>Queue Size</th>
+            <th>Offset</th>
+        </tr>
+            <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+                 <tr>
+                     <td><% entry.getValue().getPeerId() %></td>
+                     <td><% entry.getValue().getWalGroup() %></td>
+                     <td><% entry.getValue().getCurrentPath() %> </td>
+                     <td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
+                     <td><% entry.getValue().getQueueSize() %></td>
+                     <td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
+                 </tr>
+            </%for>
+    </table>
+</%def>
+
+<%def replicationDelay>
+<%args>
+    Map<String, ReplicationStatus> metrics;
+</%args>
+    <table class="table table-striped">
+        <tr>
+            <th>PeerId</th>
+            <th>WalGroup</th>
+            <th>Current Log</th>
+            <th>Last Shipped Age</th>
+            <th>Replication Delay</th>
+        </tr>
+            <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+                 <tr>
+                     <td><% entry.getValue().getPeerId() %></td>
+                     <td><% entry.getValue().getWalGroup() %></td>
+                     <td><% entry.getValue().getCurrentPath() %> </td>
+                     <td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
+                     <td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
+                 </tr>
+            </%for>
+    </table>
+</%def>

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9509ea7..1614cf5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -2984,6 +2986,20 @@ public class HRegionServer extends HasThread implements
     return service;
   }
 
+  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
+    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
+    if(!this.isOnline()){
+      return walGroupsReplicationStatus;
+    }
+    List<ReplicationSourceInterface> allSources = new ArrayList<>();
+    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
+    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
+    for(ReplicationSourceInterface source: allSources){
+      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
+    }
+    return walGroupsReplicationStatus;
+  }
+
   /**
    * Utility for constructing an instance of the passed HRegionServer class.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index c34231d..e9bbaea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -54,5 +54,5 @@ public interface ReplicationService {
   /**
    * Refresh and Get ReplicationLoad
    */
-  public ReplicationLoad refreshAndGetReplicationLoad();
+  ReplicationLoad refreshAndGetReplicationLoad();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 09ec477..ffc9635 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -43,4 +44,9 @@ public interface ReplicationSourceService extends ReplicationService {
    * Return the replication peers.
    */
   ReplicationPeers getReplicationPeers();
+
+  /**
+   * Returns the replication manager
+   */
+  ReplicationSourceManager getReplicationManager();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 7bc7084..906f0c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -42,6 +42,7 @@ public class MetricsSource implements BaseSource {
 
   // tracks last shipped timestamp for each wal group
   private Map<String, Long> lastTimestamps = new HashMap<>();
+  private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
   private long lastHFileRefsQueueSize = 0;
   private String id;
 
@@ -87,6 +88,7 @@ public class MetricsSource implements BaseSource {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     singleSourceSource.setLastShippedAge(age);
     globalSourceSource.setLastShippedAge(age);
+    this.ageOfLastShippedOp.put(walGroup, age);
     this.lastTimestamps.put(walGroup, timestamp);
   }
 
@@ -104,6 +106,24 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * get the last timestamp of given wal group. If the walGroup is null, return 0.
+   * @param walGroup which group we are getting
+   * @return timeStamp
+   */
+  public long getLastTimeStampOfWalGroup(String walGroup) {
+    return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
+  }
+
+  /**
+   * get age of last shipped op of given wal group. If the walGroup is null, return 0
+   * @param walGroup which group we are getting
+   * @return age
+   */
+  public long getAgeofLastShippedOp(String walGroup) {
+    return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
+  }
+
+  /**
    * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
    * when replication fails and need to keep that metric accurate.
    * @param walGroupId id of the group to update

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 219c03d..53e560b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -79,19 +79,8 @@ public class ReplicationLoad {
       long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
       int sizeOfLogQueue = sm.getSizeOfLogQueue();
       long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
-      long replicationLag;
-      long timePassedAfterLastShippedOp =
-          EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
-      if (sizeOfLogQueue != 0) {
-        // err on the large side
-        replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
-      } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
-        replicationLag = ageOfLastShippedOp; // last shipped happen recently
-      } else {
-        // last shipped may happen last night,
-        // so NO real lag although ageOfLastShippedOp is non-zero
-        replicationLag = 0;
-      }
+      long replicationLag =
+          calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
 
       ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
       if (rLoadSource != null) {
@@ -114,6 +103,29 @@ public class ReplicationLoad {
     this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
   }
 
+  static long calculateReplicationDelay(long ageOfLastShippedOp,
+      long timeStampOfLastShippedOp, int sizeOfLogQueue) {
+    long replicationLag;
+    long timePassedAfterLastShippedOp;
+    if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE
+      return Long.MAX_VALUE;
+    } else {
+      timePassedAfterLastShippedOp =
+          EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
+    }
+    if (sizeOfLogQueue > 1) {
+      // err on the large side
+      replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
+    } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+      replicationLag = ageOfLastShippedOp; // last shipped happen recently
+    } else {
+      // last shipped may happen last night,
+      // so NO real lag although ageOfLastShippedOp is non-zero
+      replicationLag = 0;
+    }
+    return replicationLag;
+  }
+
   /**
    * sourceToString
    * @return a string contains sourceReplicationLoad information

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a0d8321..10fa50f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
@@ -25,6 +28,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -312,6 +316,50 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
   }
 
+  @Override
+  public Map<String, ReplicationStatus> getWalGroupStatus() {
+    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
+    long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
+      String walGroupId = walGroupShipper.getKey();
+      ReplicationSourceShipper shipper = walGroupShipper.getValue();
+      lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
+      ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+      int queueSize = queues.get(walGroupId).size();
+      replicationDelay =
+          ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
+      Path currentPath = shipper.getCurrentPath();
+      try {
+        fileSize = getFileSize(currentPath);
+      } catch (IOException e) {
+        LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
+        fileSize = -1;
+      }
+      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
+      statusBuilder.withPeerId(this.getPeerId())
+          .withQueueSize(queueSize)
+          .withWalGroup(walGroupId)
+          .withCurrentPath(currentPath)
+          .withCurrentPosition(shipper.getCurrentPosition())
+          .withFileSize(fileSize)
+          .withAgeOfLastShippedOp(ageOfLastShippedOp)
+          .withReplicationDelay(replicationDelay);
+      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
+    }
+    return sourceReplicationStatus;
+  }
+
+  private long getFileSize(Path currentPath) throws IOException {
+    long fileSize;
+    try {
+      fileSize = fs.getContentSummary(currentPath).getLength();
+    } catch (FileNotFoundException e) {
+      currentPath = getArchivedLogPath(currentPath, conf);
+      fileSize = fs.getContentSummary(currentPath).getLength();
+    }
+    return fileSize;
+  }
+
   protected ReplicationSourceShipper createNewShipper(String walGroupId,
       PriorityBlockingQueue<Path> queue) {
     return new ReplicationSourceShipper(conf, walGroupId, queue, this);

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 3ce5bfe..df7a8cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
@@ -183,6 +185,14 @@ public interface ReplicationSourceInterface {
   ServerName getServerWALsBelongTo();
 
   /**
+   * get the stat of replication for each wal group.
+   * @return stat of replication
+   */
+  default Map<String, ReplicationStatus> getWalGroupStatus() {
+    return new HashMap<>();
+  }
+
+  /**
    * @return whether this is a replication source for recovery.
    */
   default boolean isRecovered() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 30696d1..5d6198e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -72,6 +72,8 @@ public class ReplicationSourceShipper extends Thread {
   protected final long sleepForRetries;
   // Maximum number of retries before taking bold actions
   protected final int maxRetriesMultiplier;
+  private final int DEFAULT_TIMEOUT = 20000;
+  private final int getEntriesTimeout;
 
   public ReplicationSourceShipper(Configuration conf, String walGroupId,
       PriorityBlockingQueue<Path> queue, ReplicationSource source) {
@@ -83,6 +85,8 @@ public class ReplicationSourceShipper extends Thread {
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+    this.getEntriesTimeout =
+        this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
   }
 
   @Override
@@ -98,7 +102,13 @@ public class ReplicationSourceShipper extends Thread {
         continue;
       }
       try {
-        WALEntryBatch entryBatch = entryReader.take();
+        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
+        if (entryBatch == null) {
+          // since there is no logs need to replicate, we refresh the ageOfLastShippedOp
+          source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+            walGroupId);
+          continue;
+        }
         // the NO_MORE_DATA instance has no path so do not call shipEdits
         if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
           noMoreData();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 666ee2a..b3bdb02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -300,6 +301,10 @@ class ReplicationSourceWALReader extends Thread {
     return entryBatchQueue.take();
   }
 
+  public WALEntryBatch poll(long timeout) throws InterruptedException {
+    return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
+  }
+
   private long getEntrySizeIncludeBulkLoad(Entry entry) {
     WALEdit edit = entry.getEdit();
     WALKey key = entry.getKey();

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
new file mode 100644
index 0000000..10d6cd5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationStatus {
+  private final String peerId;
+  private final String walGroup;
+  private final Path currentPath;
+  private final int queueSize;
+  private final long ageOfLastShippedOp;
+  private final long replicationDelay;
+  private final long currentPosition;
+  private final long fileSize;
+
+  private ReplicationStatus(ReplicationStatusBuilder builder) {
+    this.peerId = builder.peerId;
+    this.walGroup = builder.walGroup;
+    this.currentPath = builder.currentPath;
+    this.queueSize = builder.queueSize;
+    this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
+    this.replicationDelay = builder.replicationDelay;
+    this.currentPosition = builder.currentPosition;
+    this.fileSize = builder.fileSize;
+  }
+
+  public long getCurrentPosition() {
+    return currentPosition;
+  }
+
+  public long getFileSize() {
+    return fileSize;
+  }
+
+  public String getPeerId() {
+    return peerId;
+  }
+
+  public String getWalGroup() {
+    return walGroup;
+  }
+
+  public int getQueueSize() {
+    return queueSize;
+  }
+
+  public long getAgeOfLastShippedOp() {
+    return ageOfLastShippedOp;
+  }
+
+  public long getReplicationDelay() {
+    return replicationDelay;
+  }
+
+  public Path getCurrentPath() {
+    return currentPath;
+  }
+
+  public static ReplicationStatusBuilder newBuilder() {
+    return new ReplicationStatusBuilder();
+  }
+
+  public static class ReplicationStatusBuilder {
+    private String peerId = "UNKNOWN";
+    private String walGroup = "UNKNOWN";
+    private Path currentPath = new Path("UNKNOWN");
+    private int queueSize = -1;
+    private long ageOfLastShippedOp = -1;
+    private long replicationDelay = -1;
+    private long currentPosition = -1;
+    private long fileSize = -1;
+
+    public ReplicationStatusBuilder withPeerId(String peerId) {
+      this.peerId = peerId;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withFileSize(long fileSize) {
+      this.fileSize = fileSize;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withWalGroup(String walGroup) {
+      this.walGroup = walGroup;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
+      this.currentPath = currentPath;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withQueueSize(int queueSize) {
+      this.queueSize = queueSize;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
+      this.ageOfLastShippedOp = ageOfLastShippedOp;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
+      this.replicationDelay = replicationDelay;
+      return this;
+    }
+
+    public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
+      this.currentPosition = currentPosition;
+      return this;
+    }
+
+    public ReplicationStatus build() {
+      return new ReplicationStatus(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
new file mode 100644
index 0000000..8ff4d84
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationMetricsforUI extends TestReplicationBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationMetricsforUI.class);
+  private static final byte[] qualName = Bytes.toBytes("q");
+
+  @Test
+  public void testReplicationMetrics() throws Exception {
+    try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+      Put p = new Put(Bytes.toBytes("starter"));
+      p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+      htable1.put(p);
+      // make sure replication done
+      while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
+        Thread.sleep(500);
+      }
+      // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+      Thread.sleep(5000);
+      HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
+      Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
+      Assert.assertEquals("metric size ", 1, metrics.size());
+      long lastPosition = 0;
+      for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+        Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+        Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
+        Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+        Assert.assertTrue("current position >= 0", metric.getValue().getCurrentPosition() >= 0);
+        lastPosition = metric.getValue().getCurrentPosition();
+      }
+      for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+        p = new Put(Bytes.toBytes("" + Integer.toString(i)));
+        p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
+        htable1.put(p);
+      }
+      while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
+          .size() == 0) {
+        Thread.sleep(500);
+      }
+      rs = utility1.getRSForFirstRegionInTable(tableName);
+      metrics = rs.getWalGroupsReplicationStatus();
+      Path lastPath = null;
+      for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+        lastPath = metric.getValue().getCurrentPath();
+        Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+        Assert.assertTrue("age of Last Shipped Op should be > 0 ",
+          metric.getValue().getAgeOfLastShippedOp() > 0);
+        Assert.assertTrue("current position should > last position",
+          metric.getValue().getCurrentPosition() - lastPosition > 0);
+        lastPosition = metric.getValue().getCurrentPosition();
+      }
+
+      hbaseAdmin.rollWALWriter(rs.getServerName());
+      p = new Put(Bytes.toBytes("trigger"));
+      p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+      htable1.put(p);
+      // make sure replication rolled to a new log
+      while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
+        Thread.sleep(500);
+      }
+      // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+      Thread.sleep(5000);
+      metrics = rs.getWalGroupsReplicationStatus();
+      for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+        Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+        Assert.assertTrue("current position should < last position",
+          metric.getValue().getCurrentPosition() < lastPosition);
+        Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
+      }
+    }
+  }
+}
\ No newline at end of file