You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/01/22 02:00:36 UTC
[hbase] 01/02: HBASE-21680 Port HBASE-20194 (Basic Replication
WebUI - Master) and HBASE-20193 (Basic Replication Web UI - Regionserver)
to branch-1
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1fd76bd35a8f69cc21e175c28967ea4c4bad4584
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Fri Jan 18 14:58:34 2019 -0800
HBASE-21680 Port HBASE-20194 (Basic Replication WebUI - Master) and HBASE-20193 (Basic Replication Web UI - Regionserver) to branch-1
HBASE-20193 Basic Replication Web UI - Regionserver
HBASE-20194 Basic Replication WebUI - Master
---
.../hbase/tmpl/master/MasterStatusTmpl.jamon | 42 +++++++
.../hbase/tmpl/master/RegionServerListTmpl.jamon | 86 +++++++++++--
.../hbase/tmpl/regionserver/RSStatusTmpl.jamon | 4 +
.../tmpl/regionserver/ReplicationStatusTmpl.jamon | 105 ++++++++++++++++
.../org/apache/hadoop/hbase/master/HMaster.java | 21 ++++
.../hadoop/hbase/regionserver/HRegionServer.java | 16 +++
.../hbase/regionserver/ReplicationService.java | 2 +-
.../regionserver/ReplicationSourceService.java | 6 +
.../replication/regionserver/MetricsSource.java | 37 ++++--
.../replication/regionserver/ReplicationLoad.java | 33 +++--
.../regionserver/ReplicationSource.java | 37 ++++++
.../regionserver/ReplicationSourceInterface.java | 6 +
.../ReplicationSourceWALReaderThread.java | 11 +-
.../regionserver/ReplicationStatus.java | 135 +++++++++++++++++++++
.../hbase/master/TestGetReplicationLoad.java | 133 ++++++++++++++++++++
.../hbase/replication/ReplicationSourceDummy.java | 8 ++
.../hbase/replication/TestReplicationBase.java | 1 +
.../replication/TestReplicationMetricsforUI.java | 105 ++++++++++++++++
18 files changed, 750 insertions(+), 38 deletions(-)
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
index efff6f7..f365221 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
@@ -43,6 +43,7 @@ org.apache.hadoop.hbase.ServerLoad;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.client.Admin;
org.apache.hadoop.hbase.client.HConnectionManager;
+org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
org.apache.hadoop.hbase.HRegionInfo;
org.apache.hadoop.hbase.master.RegionState;
org.apache.hadoop.hbase.HTableDescriptor;
@@ -52,6 +53,9 @@ org.apache.hadoop.hbase.tool.Canary;
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
org.apache.hadoop.hbase.master.DeadServer;
org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+org.apache.hadoop.hbase.replication.ReplicationPeer;
+org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+org.apache.hadoop.hbase.replication.ReplicationSerDeHelper;
org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
org.apache.hadoop.hbase.security.access.AccessControlLists;
org.apache.hadoop.hbase.quotas.QuotaUtil;
@@ -232,6 +236,10 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
</div>
</div>
</section>
+ <section>
+ <h2>Peers</h2>
+ <& peerConfigs &>
+ </section>
<%if master.getAssignmentManager() != null %>
<& AssignmentManagerStatusTmpl; assignmentManager=master.getAssignmentManager()&>
</%if>
@@ -554,3 +562,37 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
</table>
</%if>
</%def>
+
+<%def peerConfigs>
+<%java>
+ Map<String, ReplicationPeerConfig> peers = null;
+ try (ReplicationAdmin admin = new ReplicationAdmin(master.getConfiguration())) {
+ peers = admin.listPeerConfigs();
+ }
+</%java>
+<table class="table table-striped">
+ <tr>
+ <th>Peer Id</th>
+ <th>Cluster Key</th>
+ <th>Bandwidth</th>
+ <th>Table Cfs</th>
+ </tr>
+<%if (peers != null && peers.size() > 0)%>
+ <%for Map.Entry<String, ReplicationPeerConfig> peer : peers.entrySet() %>
+ <%java>
+ String peerId = peer.getKey();
+ ReplicationPeerConfig peerConfig = peer.getValue();
+ </%java>
+ <tr>
+ <td><% peerId %></td>
+ <td><% peerConfig.getClusterKey() %></td>
+ <td><% peerConfig.getBandwidth() == 0? "UNLIMITED" : StringUtils.humanReadableInt(peerConfig.getBandwidth()) %></td>
+ <td>
+ <% peerConfig.getTableCFsMap() == null ? "" : ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap()).replaceAll(";", "; ") %>
+ </td>
+ </tr>
+ </%for>
+</%if>
+<tr><td>Total: <% (peers != null) ? peers.size() : 0 %></td></tr>
+</table>
+</%def>
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
index 29c0f77..8116b28 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
@@ -26,6 +26,8 @@ HMaster master;
<%import>
java.util.*;
org.apache.hadoop.hbase.master.HMaster;
+ org.apache.hadoop.hbase.procedure2.util.StringUtils;
+ org.apache.hadoop.hbase.replication.ReplicationLoadSource;
org.apache.hadoop.hbase.ServerLoad;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -33,6 +35,7 @@ HMaster master;
org.apache.hadoop.hbase.HTableDescriptor;
org.apache.hadoop.hbase.HBaseConfiguration;
org.apache.hadoop.hbase.util.VersionInfo;
+ org.apache.hadoop.hbase.util.Pair;
org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
</%import>
@@ -50,7 +53,8 @@ Arrays.sort(serverNames);
<li class=""><a href="#tab_memoryStats" data-toggle="tab">Memory</a></li>
<li class=""><a href="#tab_requestStats" data-toggle="tab">Requests</a></li>
<li class=""><a href="#tab_storeStats" data-toggle="tab">Storefiles</a></li>
- <li class=""><a href="#tab_compactStas" data-toggle="tab">Compactions</a></li>
+ <li class=""><a href="#tab_compactStats" data-toggle="tab">Compactions</a></li>
+ <li class=""><a href="#tab_replicationStats" data-toggle="tab">Replications</a></li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_baseStats">
@@ -65,9 +69,12 @@ Arrays.sort(serverNames);
<div class="tab-pane" id="tab_storeStats">
<& storeStats; serverNames = serverNames; &>
</div>
- <div class="tab-pane" id="tab_compactStas">
+ <div class="tab-pane" id="tab_compactStats">
<& compactionStats; serverNames = serverNames; &>
</div>
+ <div class="tab-pane" id="tab_replicationStats">
+ <& replicationStats; serverNames = serverNames; &>
+ </div>
</div>
</div>
@@ -117,7 +124,7 @@ Arrays.sort(serverNames);
long startcode = serverName.getStartcode();
</%java>
<tr>
- <td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+ <td><& serverNameLink; serverName=serverName; &></td>
<td><% new Date(startcode) %></td>
<td><% TraditionalBinaryPrefix.long2String(lastContact, "s", 1) %></td>
<td><% version %></td>
@@ -164,7 +171,7 @@ for (ServerName serverName: serverNames) {
if (sl != null) {
</%java>
<tr>
- <td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+ <td><& serverNameLink; serverName=serverName; &></td>
<td><% TraditionalBinaryPrefix.long2String(sl.getUsedHeapMB()
* TraditionalBinaryPrefix.MEGA.value, "B", 1) %></td>
<td><% TraditionalBinaryPrefix.long2String(sl.getMaxHeapMB()
@@ -207,7 +214,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName);
if (sl != null) {
</%java>
<tr>
-<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+<td><& serverNameLink; serverName=serverName; &></td>
<td><% String.format("%.0f", sl.getRequestsPerSecond()) %></td>
<td><% sl.getReadRequestsCount() %></td>
<td><% sl.getWriteRequestsCount() %></td>
@@ -249,7 +256,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName);
if (sl != null) {
</%java>
<tr>
-<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+<td><& serverNameLink; serverName=serverName; &></td>
<td><% sl.getStores() %></td>
<td><% sl.getStorefiles() %></td>
<td><% TraditionalBinaryPrefix.long2String(
@@ -300,7 +307,7 @@ if (sl.getTotalCompactingKVs() > 0) {
}
</%java>
<tr>
-<td><& serverNameLink; serverName=serverName; serverLoad = sl; &></td>
+<td><& serverNameLink; serverName=serverName; &></td>
<td><% sl.getTotalCompactingKVs() %></td>
<td><% sl.getCurrentCompactedKVs() %></td>
<td><% sl.getTotalCompactingKVs() - sl.getCurrentCompactedKVs() %></td>
@@ -318,11 +325,72 @@ if (sl.getTotalCompactingKVs() > 0) {
</table>
</%def>
+<%def replicationStats>
+<%args>
+ ServerName [] serverNames;
+</%args>
+<%java>
+ HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap
+ = master.getReplicationLoad(serverNames);
+ List<String> peers = null;
+ if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0){
+ peers = new ArrayList<>(replicationLoadSourceMap.keySet());
+ Collections.sort(peers);
+ }
+</%java>
+
+<%if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0) %>
+
+<div class="tabbable">
+ <ul class="nav nav-tabs">
+ <%java>
+ String active = "active";
+ for (String peer : peers){
+ </%java>
+ <li class=<% active %>><a href="#tab_<% peer %>" data-toggle="tab">Peer <% peer %></a> </li>
+ <%java>
+ active = "";
+ }
+ </%java>
+ </ul>
+ <div class="tab-content">
+ <%java>
+ active = "active";
+ for (String peer : peers){
+ </%java>
+ <div class="tab-pane <% active %>" id="tab_<% peer %>">
+ <table class="table table-striped">
+ <tr>
+ <th>Server</th>
+ <th>AgeOfLastShippedOp</th>
+ <th>SizeOfLogQueue</th>
+ <th>ReplicationLag</th>
+ </tr>
+
+ <%for Pair<ServerName, ReplicationLoadSource> pair: replicationLoadSourceMap.get(peer) %>
+ <tr>
+ <td><& serverNameLink; serverName=pair.getFirst(); &></td>
+ <td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
+ <td><% pair.getSecond().getSizeOfLogQueue() %></td>
+ <td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
+ </tr>
+ </%for>
+ </table>
+ </div>
+ <%java>
+ active = "";
+ }
+ </%java>
+ </div>
+</div>
+<%else>
+ <p>No Peers Metrics</p>
+</%if>
+</%def>
<%def serverNameLink>
<%args>
ServerName serverName;
- ServerLoad serverLoad;
</%args>
<%java>
int infoPort = master.getRegionServerInfoPort(serverName);
@@ -341,7 +409,7 @@ if (sl.getTotalCompactingKVs() > 0) {
ServerName serverName;
</%args>
<tr>
- <td><& serverNameLink; serverName=serverName; serverLoad = null; &></td>
+ <td><& serverNameLink; serverName=serverName; &></td>
<td></td>
<td></td>
<td></td>
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
index a8d4003..bc3bbd7 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
@@ -122,6 +122,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
<& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
</section>
+ <section>
+ <h2>Replication Status</h1>
+ <& ReplicationStatusTmpl; regionServer = regionServer; &>
+ </section>
<section>
<h2>Software Attributes</h2>
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
new file mode 100644
index 0000000..7dc1c7f
--- /dev/null
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
@@ -0,0 +1,105 @@
+<%doc>
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+</%doc>
+<%args>
+ HRegionServer regionServer;
+</%args>
+<%import>
+ java.util.*;
+ java.util.Map.Entry;
+ org.apache.hadoop.hbase.procedure2.util.StringUtils;
+ org.apache.hadoop.hbase.regionserver.HRegionServer;
+ org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+</%import>
+
+<%java>
+ Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
+</%java>
+
+<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
+
+ <div class="tabbable">
+ <ul class="nav nav-pills">
+ <li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
+ <li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
+ </ul>
+ <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
+ <div class="tab-pane active" id="tab_currentLog">
+ <& currentLog; metrics = walGroupsReplicationStatus; &>
+ </div>
+ <div class="tab-pane" id="tab_replicationDelay">
+ <& replicationDelay; metrics = walGroupsReplicationStatus; &>
+ </div>
+ </div>
+ </div>
+ <p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
+ If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
+
+<%else>
+ <p>No Replication Metrics for Peers</p>
+</%if>
+
+<%def currentLog>
+<%args>
+ Map<String, ReplicationStatus> metrics;
+</%args>
+ <table class="table table-striped">
+ <tr>
+ <th>PeerId</th>
+ <th>WalGroup</th>
+ <th>Current Log</th>
+ <th>Size</th>
+ <th>Queue Size</th>
+ <th>Offset</th>
+ </tr>
+ <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+ <tr>
+ <td><% entry.getValue().getPeerId() %></td>
+ <td><% entry.getValue().getWalGroup() %></td>
+ <td><% entry.getValue().getCurrentPath() %> </td>
+ <td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
+ <td><% entry.getValue().getQueueSize() %></td>
+ <td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
+ </tr>
+ </%for>
+ </table>
+</%def>
+
+<%def replicationDelay>
+<%args>
+ Map<String, ReplicationStatus> metrics;
+</%args>
+ <table class="table table-striped">
+ <tr>
+ <th>PeerId</th>
+ <th>WalGroup</th>
+ <th>Current Log</th>
+ <th>Last Shipped Age</th>
+ <th>Replication Delay</th>
+ </tr>
+ <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+ <tr>
+ <td><% entry.getValue().getPeerId() %></td>
+ <td><% entry.getValue().getWalGroup() %></td>
+ <td><% entry.getValue().getCurrentPath() %> </td>
+ <td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
+ <td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
+ </tr>
+ </%for>
+ </table>
+</%def>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0d0b5c1..c1405b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
+import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
@@ -3292,4 +3293,24 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
public LoadBalancer getLoadBalancer() {
return balancer;
}
+
+ public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
+ getReplicationLoad(ServerName[] serverNames) {
+ HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
+ new HashMap<>();
+ for (ServerName serverName : serverNames) {
+ List<ReplicationLoadSource> replicationLoadSources =
+ getServerManager().getLoad(serverName).getReplicationLoadSourceList();
+ for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
+ List<Pair<ServerName, ReplicationLoadSource>> list =
+ replicationLoadSourceMap.get(replicationLoadSource.getPeerID());
+ if (list == null) {
+ list = new ArrayList<Pair<ServerName, ReplicationLoadSource>>();
+ replicationLoadSourceMap.put(replicationLoadSource.getPeerID(), list);
+ }
+ list.add(new Pair<>(serverName, replicationLoadSource));
+ }
+ }
+ return replicationLoadSourceMap;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f7737de..258f68e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -164,6 +164,8 @@ import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -2832,6 +2834,20 @@ public class HRegionServer extends HasThread implements
return service;
}
+ public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
+ Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
+ if(!this.isOnline()){
+ return walGroupsReplicationStatus;
+ }
+ List<ReplicationSourceInterface> allSources = new ArrayList<>();
+ allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
+ allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
+ for(ReplicationSourceInterface source: allSources){
+ walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
+ }
+ return walGroupsReplicationStatus;
+ }
+
/**
* Utility for constructing an instance of the passed HRegionServer class.
*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 25a27a9..3a0ed85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -57,5 +57,5 @@ public interface ReplicationService {
/**
* Refresh and Get ReplicationLoad
*/
- public ReplicationLoad refreshAndGetReplicationLoad();
+ ReplicationLoad refreshAndGetReplicationLoad();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 13b502b..93f98f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
/**
* A source for a replication stream has to expose this service.
@@ -33,4 +34,9 @@ public interface ReplicationSourceService extends ReplicationService {
* observe log rolls and log archival events.
*/
WALActionsListener getWALActionsListener();
+
+ /**
+ * Returns the replication manager
+ */
+ ReplicationSourceManager getReplicationManager();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 53e1074..5503632 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -37,10 +35,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource implements BaseSource {
- private static final Log LOG = LogFactory.getLog(MetricsSource.class);
-
// tracks last shipped timestamp for each wal group
- private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
+ private Map<String, Long> lastTimestamps = new HashMap<>();
+ private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
private long lastHFileRefsQueueSize = 0;
private String id;
@@ -87,7 +84,8 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
- this.lastTimeStamps.put(walGroup, timestamp);
+ this.ageOfLastShippedOp.put(walGroup, age);
+ this.lastTimestamps.put(walGroup, timestamp);
}
/**
@@ -105,14 +103,32 @@ public class MetricsSource implements BaseSource {
this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
}
/**
+ * get the last timestamp of given wal group. If the walGroup is null, return 0.
+ * @param walGroup which group we are getting
+ * @return timeStamp
+ */
+ public long getLastTimeStampOfWalGroup(String walGroup) {
+ return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
+ }
+
+ /**
+ * get age of last shipped op of given wal group. If the walGroup is null, return 0
+ * @param walGroup which group we are getting
+ * @return age
+ */
+ public long getAgeofLastShippedOp(String walGroup) {
+ return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
+ }
+
+ /**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.
* @param walGroupId id of the group to update
*/
public void refreshAgeOfLastShippedOp(String walGroupId) {
- Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
+ Long lastTimestamp = this.lastTimestamps.get(walGroupId);
if (lastTimestamp == null) {
- this.lastTimeStamps.put(walGroupId, 0L);
+ this.lastTimestamps.put(walGroupId, 0L);
lastTimestamp = 0L;
}
if (lastTimestamp > 0) {
@@ -204,7 +220,8 @@ public class MetricsSource implements BaseSource {
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
- lastTimeStamps.clear();
+ lastTimestamps.clear();
+ ageOfLastShippedOp.clear();
lastHFileRefsQueueSize = 0;
}
@@ -230,7 +247,7 @@ public class MetricsSource implements BaseSource {
*/
public long getTimeStampOfLastShippedOp() {
long lastTimestamp = 0L;
- for (long ts : lastTimeStamps.values()) {
+ for (long ts : lastTimestamps.values()) {
if (ts > lastTimestamp) {
lastTimestamp = ts;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 2ead3df..51fbc93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -79,19 +79,8 @@ public class ReplicationLoad {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
- long replicationLag;
- long timePassedAfterLastShippedOp =
- EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
- if (sizeOfLogQueue != 0) {
- // err on the large side
- replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
- } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
- replicationLag = ageOfLastShippedOp; // last shipped happen recently
- } else {
- // last shipped may happen last night,
- // so NO real lag although ageOfLastShippedOp is non-zero
- replicationLag = 0;
- }
+ long replicationLag =
+ calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
if (rLoadSource != null) {
@@ -116,6 +105,24 @@ public class ReplicationLoad {
replicationLoadSourceMap.values());
}
+ static long calculateReplicationDelay(long ageOfLastShippedOp,
+ long timeStampOfLastShippedOp, int sizeOfLogQueue) {
+ long replicationLag;
+ long timePassedAfterLastShippedOp =
+ EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
+ if (sizeOfLogQueue > 1) {
+ // err on the large side
+ replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
+ } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+ replicationLag = ageOfLastShippedOp; // last shipped happen recently
+ } else {
+ // last shipped may happen last night,
+ // so NO real lag although ageOfLastShippedOp is non-zero
+ replicationLag = 0;
+ }
+ return replicationLag;
+ }
+
/**
* sourceToString
* @return a string contains sourceReplicationLoad information
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2396655..cae7c21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -501,6 +502,38 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
+ @Override
+ public Map<String, ReplicationStatus> getWalGroupStatus() {
+ Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
+ long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+ for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+ String walGroupId = worker.getWalGroupId();
+ lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
+ ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+ int queueSize = queues.get(walGroupId).size();
+ replicationDelay =
+ ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
+ Path currentPath = worker.getCurrentPath();
+ try {
+ fileSize = fs.getContentSummary(currentPath).getLength();
+ } catch (IOException e) {
+ fileSize = -1;
+ }
+ ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
+ statusBuilder.withPeerId(this.getPeerClusterId())
+ .withQueueSize(queueSize)
+ .withWalGroup(walGroupId)
+ .withCurrentPath(currentPath)
+ .withCurrentPosition(worker.getCurrentPosition())
+ .withFileSize(fileSize)
+ .withAgeOfLastShippedOp(ageOfLastShippedOp)
+ .withReplicationDelay(replicationDelay);
+ sourceReplicationStatus.put(this.getPeerClusterId() + "=>" + walGroupId,
+ statusBuilder.build());
+ }
+ return sourceReplicationStatus;
+ }
+
// This thread reads entries from a queue and ships them.
// Entries are placed onto the queue by ReplicationSourceWALReaderThread
public class ReplicationSourceShipperThread extends Thread {
@@ -525,6 +558,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.source = source;
}
+ public String getWalGroupId() {
+ return walGroupId;
+ }
+
@Override
public void run() {
setWorkerState(WorkerState.RUNNING);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index e7569ed..8bfbca0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -127,4 +128,9 @@ public interface ReplicationSourceInterface {
*/
MetricsSource getSourceMetrics();
+ /**
+ * get the stat of replication for each wal group.
+ * @return stat of replication
+ */
+ Map<String, ReplicationStatus> getWalGroupStatus();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 03fe229..ec5e862 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -251,7 +252,11 @@ public class ReplicationSourceWALReaderThread extends Thread {
return entryBatchQueue.take();
}
- public long getEntrySizeIncludeBulkLoad(Entry entry) {
+ public WALEntryBatch poll(long timeout) throws InterruptedException {
+ return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
@@ -470,9 +475,5 @@ public class ReplicationSourceWALReaderThread extends Thread {
private void incrementHeapSize(long increment) {
heapSize += increment;
}
-
- private void setLastPosition(String region, Long sequenceId) {
- getLastSeqIds().put(region, sequenceId);
- }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
new file mode 100644
index 0000000..41076cc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationStatus {
+ private final String peerId;
+ private final String walGroup;
+ private final Path currentPath;
+ private final int queueSize;
+ private final long ageOfLastShippedOp;
+ private final long replicationDelay;
+ private final long currentPosition;
+ private final long fileSize;
+
+ private ReplicationStatus(ReplicationStatusBuilder builder) {
+ this.peerId = builder.peerId;
+ this.walGroup = builder.walGroup;
+ this.currentPath = builder.currentPath;
+ this.queueSize = builder.queueSize;
+ this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
+ this.replicationDelay = builder.replicationDelay;
+ this.currentPosition = builder.currentPosition;
+ this.fileSize = builder.fileSize;
+ }
+
+ public long getCurrentPosition() {
+ return currentPosition;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+
+ public String getWalGroup() {
+ return walGroup;
+ }
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public long getAgeOfLastShippedOp() {
+ return ageOfLastShippedOp;
+ }
+
+ public long getReplicationDelay() {
+ return replicationDelay;
+ }
+
+ public Path getCurrentPath() {
+ return currentPath;
+ }
+
+ public static ReplicationStatusBuilder newBuilder() {
+ return new ReplicationStatusBuilder();
+ }
+
+ public static class ReplicationStatusBuilder {
+ private String peerId = "UNKNOWN";
+ private String walGroup = "UNKNOWN";
+ private Path currentPath = new Path("UNKNOWN");
+ private int queueSize = -1;
+ private long ageOfLastShippedOp = -1;
+ private long replicationDelay = -1;
+ private long currentPosition = -1;
+ private long fileSize = -1;
+
+ public ReplicationStatusBuilder withPeerId(String peerId) {
+ this.peerId = peerId;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withWalGroup(String walGroup) {
+ this.walGroup = walGroup;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
+ this.currentPath = currentPath;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withQueueSize(int queueSize) {
+ this.queueSize = queueSize;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
+ this.ageOfLastShippedOp = ageOfLastShippedOp;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
+ this.replicationDelay = replicationDelay;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
+ this.currentPosition = currentPosition;
+ return this;
+ }
+
+ public ReplicationStatus build() {
+ return new ReplicationStatus(this);
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
new file mode 100644
index 0000000..bad92b8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
+import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestGetReplicationLoad {
+ private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class);
+
+ private static HBaseTestingUtility TEST_UTIL;
+ private static MiniHBaseCluster cluster;
+ private static HMaster master;
+ private static ReplicationAdmin admin;
+
+ private static final String ID_1 = "1";
+ private static final String ID_2 = "2";
+ private static final String KEY_1 = "127.0.0.1:2181:/hbase";
+ private static final String KEY_2 = "127.0.0.1:2181:/hbase2";
+
+ public static class MyMaster extends HMaster {
+ public MyMaster(Configuration conf, CoordinatedStateManager csm)
+ throws IOException, KeeperException, InterruptedException {
+ super(conf, csm);
+ }
+
+ @Override
+ protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
+ // do nothing
+ }
+ }
+
+ @BeforeClass
+ public static void startCluster() throws Exception {
+ LOG.info("Starting cluster");
+ TEST_UTIL = new HBaseTestingUtility();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
+ TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null);
+ cluster = TEST_UTIL.getHBaseCluster();
+ LOG.info("Waiting for active/ready master");
+ cluster.waitForActiveAndReadyMaster();
+ master = cluster.getMaster();
+ admin = new ReplicationAdmin(conf);
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ if (admin != null) {
+ admin.close();
+ }
+ if (TEST_UTIL != null) {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testGetReplicationMetrics() throws Exception {
+ String peer1 = "test1", peer2 = "test2";
+ long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4;
+ int sizeOfLogQueue = 5;
+ RegionServerStatusProtos.RegionServerReportRequest.Builder request =
+ RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
+ ServerName serverName = cluster.getMaster(0).getServerName();
+ request.setServer(ProtobufUtil.toServerName(serverName));
+ ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource
+ .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
+ .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
+ .setSizeOfLogQueue(sizeOfLogQueue).build();
+ ClusterStatusProtos.ReplicationLoadSource rload2 =
+ ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
+ .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1)
+ .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
+ .setSizeOfLogQueue(sizeOfLogQueue + 1).build();
+ ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
+ .addReplLoadSource(rload1).addReplLoadSource(rload2).build();
+ request.setLoad(sl);
+
+ ReplicationPeerConfig peerConfig_1 = new ReplicationPeerConfig();
+ peerConfig_1.setClusterKey(KEY_1);
+ ReplicationPeerConfig peerConfig_2 = new ReplicationPeerConfig();
+ peerConfig_2.setClusterKey(KEY_2);
+ admin.addPeer(ID_1, peerConfig_1);
+ admin.addPeer(ID_2, peerConfig_2);
+
+ master.getMasterRpcServices().regionServerReport(null, request.build());
+ HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad =
+ master.getReplicationLoad(new ServerName[] { serverName });
+ assertEquals("peer size ", 2, replicationLoad.size());
+ assertEquals("load size ", 1, replicationLoad.get(peer1).size());
+ assertEquals("log queue size of peer1", sizeOfLogQueue,
+ replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue());
+ assertEquals("replication lag of peer2", replicationLag + 1,
+ replicationLoad.get(peer2).get(0).getSecond().getReplicationLag());
+
+ master.stopMaster();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index ad8c52f..a5c61f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.util.Pair;
/**
@@ -106,4 +109,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public MetricsSource getSourceMetrics() {
return metrics;
}
+
+ @Override
+ public Map<String, ReplicationStatus> getWalGroupStatus() {
+ return new HashMap<>();
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index d0f40a6..ddd3195 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -77,6 +77,7 @@ public class TestReplicationBase {
protected static final byte[] famName = Bytes.toBytes("f");
protected static final byte[] row = Bytes.toBytes("row");
protected static final byte[] noRepfamName = Bytes.toBytes("norep");
+ protected static final String PEER_ID2 = "2";
/**
* @throws java.lang.Exception
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
new file mode 100644
index 0000000..d507b76
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationMetricsforUI extends TestReplicationBase {
+
+ private static final byte[] qualName = Bytes.toBytes("q");
+
+ @Test
+ public void testReplicationMetrics() throws Exception {
+ try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+ Put p = new Put(Bytes.toBytes("starter"));
+ p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+ htable1.put(p);
+ // make sure replication done
+ while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
+ Thread.sleep(500);
+ }
+ // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+ Thread.sleep(5000);
+ HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
+ Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
+ Assert.assertEquals("metric size ", 1, metrics.size());
+ for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+ Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+ Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
+ Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+ long pos = metric.getValue().getCurrentPosition();
+ // Semantics are a bit different in branch-1: If not started, pos will be -1
+ if (pos == -1) {
+ pos = 0;
+ }
+ Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
+ }
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p = new Put(Bytes.toBytes("" + Integer.toString(i)));
+ p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
+ htable1.put(p);
+ }
+ while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
+ .size() == 0) {
+ Thread.sleep(500);
+ }
+ rs = utility1.getRSForFirstRegionInTable(tableName);
+ metrics = rs.getWalGroupsReplicationStatus();
+ Path lastPath = null;
+ for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+ lastPath = metric.getValue().getCurrentPath();
+ Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+ Assert.assertTrue("age of Last Shipped Op should be > 0 ",
+ metric.getValue().getAgeOfLastShippedOp() > 0);
+ long pos = metric.getValue().getCurrentPosition();
+ Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
+ }
+
+ hbaseAdmin.rollWALWriter(rs.getServerName());
+ p = new Put(Bytes.toBytes("trigger"));
+ p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+ htable1.put(p);
+ // make sure replication rolled to a new log
+ while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
+ Thread.sleep(500);
+ }
+ // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+ Thread.sleep(5000);
+ metrics = rs.getWalGroupsReplicationStatus();
+ for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+ Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+ long pos = metric.getValue().getCurrentPosition();
+ Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0);
+ Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
+ }
+ }
+ }
+}
\ No newline at end of file