You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zy...@apache.org on 2018/07/10 04:38:51 UTC
[29/50] [abbrv] hbase git commit: HBASE-20193 Basic Replication Web
UI - Regionserver
HBASE-20193 Basic Replication Web UI - Regionserver
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66ad9fde
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66ad9fde
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66ad9fde
Branch: refs/heads/HBASE-18477
Commit: 66ad9fdef8e8bff3ed2b77cd8865f724b345bd18
Parents: 380350d
Author: jingyuntian <ti...@gmail.com>
Authored: Tue Jul 3 11:46:14 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Jul 3 15:47:14 2018 +0800
----------------------------------------------------------------------
.../tmpl/master/RegionServerListTmpl.jamon | 3 +-
.../hbase/tmpl/regionserver/RSStatusTmpl.jamon | 4 +
.../regionserver/ReplicationStatusTmpl.jamon | 105 +++++++++++++++
.../hbase/regionserver/HRegionServer.java | 16 +++
.../hbase/regionserver/ReplicationService.java | 2 +-
.../regionserver/ReplicationSourceService.java | 6 +
.../replication/regionserver/MetricsSource.java | 20 +++
.../regionserver/ReplicationLoad.java | 38 ++++--
.../regionserver/ReplicationSource.java | 48 +++++++
.../ReplicationSourceInterface.java | 10 ++
.../regionserver/ReplicationSourceShipper.java | 12 +-
.../ReplicationSourceWALReader.java | 5 +
.../regionserver/ReplicationStatus.java | 135 +++++++++++++++++++
.../TestReplicationMetricsforUI.java | 107 +++++++++++++++
14 files changed, 495 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
index c6c7fc3..67d3305 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon
@@ -383,7 +383,7 @@ if (totalCompactingCells > 0) {
<td><& serverNameLink; serverName=pair.getFirst(); &></td>
<td><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %></td>
<td><% pair.getSecond().getSizeOfLogQueue() %></td>
- <td><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
+ <td><% pair.getSecond().getReplicationLag() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %></td>
</tr>
</%for>
</table>
@@ -393,6 +393,7 @@ if (totalCompactingCells > 0) {
}
</%java>
</div>
+ <p>If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.</p>
</div>
<%else>
<p>No Peers Metrics</p>
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
index c9bfcc9..646d835 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon
@@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
<& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &>
</section>
+ <section>
+ <h2>Replication Status</h1>
+ <& ReplicationStatusTmpl; regionServer = regionServer; &>
+ </section>
<section>
<h2>Software Attributes</h2>
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
new file mode 100644
index 0000000..7dc1c7f
--- /dev/null
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon
@@ -0,0 +1,105 @@
+<%doc>
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+</%doc>
+<%args>
+ HRegionServer regionServer;
+</%args>
+<%import>
+ java.util.*;
+ java.util.Map.Entry;
+ org.apache.hadoop.hbase.procedure2.util.StringUtils;
+ org.apache.hadoop.hbase.regionserver.HRegionServer;
+ org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+</%import>
+
+<%java>
+ Map<String, ReplicationStatus> walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus();
+</%java>
+
+<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %>
+
+ <div class="tabbable">
+ <ul class="nav nav-pills">
+ <li class="active"><a href="#tab_currentLog" data-toggle="tab">Current Log</a> </li>
+ <li class=""><a href="#tab_replicationDelay" data-toggle="tab">Replication Delay</a></li>
+ </ul>
+ <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
+ <div class="tab-pane active" id="tab_currentLog">
+ <& currentLog; metrics = walGroupsReplicationStatus; &>
+ </div>
+ <div class="tab-pane" id="tab_replicationDelay">
+ <& replicationDelay; metrics = walGroupsReplicationStatus; &>
+ </div>
+ </div>
+ </div>
+ <p> If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled.
+ If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.</p>
+
+<%else>
+ <p>No Replication Metrics for Peers</p>
+</%if>
+
+<%def currentLog>
+<%args>
+ Map<String, ReplicationStatus> metrics;
+</%args>
+ <table class="table table-striped">
+ <tr>
+ <th>PeerId</th>
+ <th>WalGroup</th>
+ <th>Current Log</th>
+ <th>Size</th>
+ <th>Queue Size</th>
+ <th>Offset</th>
+ </tr>
+ <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+ <tr>
+ <td><% entry.getValue().getPeerId() %></td>
+ <td><% entry.getValue().getWalGroup() %></td>
+ <td><% entry.getValue().getCurrentPath() %> </td>
+ <td><% StringUtils.humanSize(entry.getValue().getFileSize()) %></td>
+ <td><% entry.getValue().getQueueSize() %></td>
+ <td><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %></td>
+ </tr>
+ </%for>
+ </table>
+</%def>
+
+<%def replicationDelay>
+<%args>
+ Map<String, ReplicationStatus> metrics;
+</%args>
+ <table class="table table-striped">
+ <tr>
+ <th>PeerId</th>
+ <th>WalGroup</th>
+ <th>Current Log</th>
+ <th>Last Shipped Age</th>
+ <th>Replication Delay</th>
+ </tr>
+ <%for Map.Entry<String, ReplicationStatus> entry: metrics.entrySet() %>
+ <tr>
+ <td><% entry.getValue().getPeerId() %></td>
+ <td><% entry.getValue().getWalGroup() %></td>
+ <td><% entry.getValue().getCurrentPath() %> </td>
+ <td><% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %></td>
+ <td><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %></td>
+ </tr>
+ </%for>
+ </table>
+</%def>
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9509ea7..1614cf5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -2984,6 +2986,20 @@ public class HRegionServer extends HasThread implements
return service;
}
+ public Map<String, ReplicationStatus> getWalGroupsReplicationStatus(){
+ Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
+ if(!this.isOnline()){
+ return walGroupsReplicationStatus;
+ }
+ List<ReplicationSourceInterface> allSources = new ArrayList<>();
+ allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
+ allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
+ for(ReplicationSourceInterface source: allSources){
+ walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
+ }
+ return walGroupsReplicationStatus;
+ }
+
/**
* Utility for constructing an instance of the passed HRegionServer class.
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index c34231d..e9bbaea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -54,5 +54,5 @@ public interface ReplicationService {
/**
* Refresh and Get ReplicationLoad
*/
- public ReplicationLoad refreshAndGetReplicationLoad();
+ ReplicationLoad refreshAndGetReplicationLoad();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 09ec477..ffc9635 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -43,4 +44,9 @@ public interface ReplicationSourceService extends ReplicationService {
* Return the replication peers.
*/
ReplicationPeers getReplicationPeers();
+
+ /**
+ * Returns the replication manager
+ */
+ ReplicationSourceManager getReplicationManager();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 7bc7084..906f0c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -42,6 +42,7 @@ public class MetricsSource implements BaseSource {
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimestamps = new HashMap<>();
+ private Map<String, Long> ageOfLastShippedOp = new HashMap<>();
private long lastHFileRefsQueueSize = 0;
private String id;
@@ -87,6 +88,7 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
+ this.ageOfLastShippedOp.put(walGroup, age);
this.lastTimestamps.put(walGroup, timestamp);
}
@@ -104,6 +106,24 @@ public class MetricsSource implements BaseSource {
}
/**
+ * get the last timestamp of given wal group. If the walGroup is null, return 0.
+ * @param walGroup which group we are getting
+ * @return timeStamp
+ */
+ public long getLastTimeStampOfWalGroup(String walGroup) {
+ return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup);
+ }
+
+ /**
+ * get age of last shipped op of given wal group. If the walGroup is null, return 0
+ * @param walGroup which group we are getting
+ * @return age
+ */
+ public long getAgeofLastShippedOp(String walGroup) {
+ return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
+ }
+
+ /**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.
* @param walGroupId id of the group to update
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 219c03d..53e560b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -79,19 +79,8 @@ public class ReplicationLoad {
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
- long replicationLag;
- long timePassedAfterLastShippedOp =
- EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
- if (sizeOfLogQueue != 0) {
- // err on the large side
- replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
- } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
- replicationLag = ageOfLastShippedOp; // last shipped happen recently
- } else {
- // last shipped may happen last night,
- // so NO real lag although ageOfLastShippedOp is non-zero
- replicationLag = 0;
- }
+ long replicationLag =
+ calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
if (rLoadSource != null) {
@@ -114,6 +103,29 @@ public class ReplicationLoad {
this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
}
+ static long calculateReplicationDelay(long ageOfLastShippedOp,
+ long timeStampOfLastShippedOp, int sizeOfLogQueue) {
+ long replicationLag;
+ long timePassedAfterLastShippedOp;
+ if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE
+ return Long.MAX_VALUE;
+ } else {
+ timePassedAfterLastShippedOp =
+ EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
+ }
+ if (sizeOfLogQueue > 1) {
+ // err on the large side
+ replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
+ } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
+ replicationLag = ageOfLastShippedOp; // last shipped happen recently
+ } else {
+ // last shipped may happen last night,
+ // so NO real lag although ageOfLastShippedOp is non-zero
+ replicationLag = 0;
+ }
+ return replicationLag;
+ }
+
/**
* sourceToString
* @return a string contains sourceReplicationLoad information
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a0d8321..10fa50f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@@ -25,6 +28,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -312,6 +316,50 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
}
+ @Override
+ public Map<String, ReplicationStatus> getWalGroupStatus() {
+ Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
+ long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+ for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
+ String walGroupId = walGroupShipper.getKey();
+ ReplicationSourceShipper shipper = walGroupShipper.getValue();
+ lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
+ ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
+ int queueSize = queues.get(walGroupId).size();
+ replicationDelay =
+ ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
+ Path currentPath = shipper.getCurrentPath();
+ try {
+ fileSize = getFileSize(currentPath);
+ } catch (IOException e) {
+ LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
+ fileSize = -1;
+ }
+ ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
+ statusBuilder.withPeerId(this.getPeerId())
+ .withQueueSize(queueSize)
+ .withWalGroup(walGroupId)
+ .withCurrentPath(currentPath)
+ .withCurrentPosition(shipper.getCurrentPosition())
+ .withFileSize(fileSize)
+ .withAgeOfLastShippedOp(ageOfLastShippedOp)
+ .withReplicationDelay(replicationDelay);
+ sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
+ }
+ return sourceReplicationStatus;
+ }
+
+ private long getFileSize(Path currentPath) throws IOException {
+ long fileSize;
+ try {
+ fileSize = fs.getContentSummary(currentPath).getLength();
+ } catch (FileNotFoundException e) {
+ currentPath = getArchivedLogPath(currentPath, conf);
+ fileSize = fs.getContentSummary(currentPath).getLength();
+ }
+ return fileSize;
+ }
+
protected ReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue<Path> queue) {
return new ReplicationSourceShipper(conf, walGroupId, queue, this);
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 3ce5bfe..df7a8cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@@ -183,6 +185,14 @@ public interface ReplicationSourceInterface {
ServerName getServerWALsBelongTo();
/**
+ * get the stat of replication for each wal group.
+ * @return stat of replication
+ */
+ default Map<String, ReplicationStatus> getWalGroupStatus() {
+ return new HashMap<>();
+ }
+
+ /**
* @return whether this is a replication source for recovery.
*/
default boolean isRecovered() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 30696d1..5d6198e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -72,6 +72,8 @@ public class ReplicationSourceShipper extends Thread {
protected final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
+ private final int DEFAULT_TIMEOUT = 20000;
+ private final int getEntriesTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
@@ -83,6 +85,8 @@ public class ReplicationSourceShipper extends Thread {
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+ this.getEntriesTimeout =
+ this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
}
@Override
@@ -98,7 +102,13 @@ public class ReplicationSourceShipper extends Thread {
continue;
}
try {
- WALEntryBatch entryBatch = entryReader.take();
+ WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
+ if (entryBatch == null) {
+ // since there is no logs need to replicate, we refresh the ageOfLastShippedOp
+ source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+ walGroupId);
+ continue;
+ }
// the NO_MORE_DATA instance has no path so do not call shipEdits
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
noMoreData();
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 666ee2a..b3bdb02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -300,6 +301,10 @@ class ReplicationSourceWALReader extends Thread {
return entryBatchQueue.take();
}
+ public WALEntryBatch poll(long timeout) throws InterruptedException {
+ return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
+
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
new file mode 100644
index 0000000..10d6cd5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationStatus {
+ private final String peerId;
+ private final String walGroup;
+ private final Path currentPath;
+ private final int queueSize;
+ private final long ageOfLastShippedOp;
+ private final long replicationDelay;
+ private final long currentPosition;
+ private final long fileSize;
+
+ private ReplicationStatus(ReplicationStatusBuilder builder) {
+ this.peerId = builder.peerId;
+ this.walGroup = builder.walGroup;
+ this.currentPath = builder.currentPath;
+ this.queueSize = builder.queueSize;
+ this.ageOfLastShippedOp = builder.ageOfLastShippedOp;
+ this.replicationDelay = builder.replicationDelay;
+ this.currentPosition = builder.currentPosition;
+ this.fileSize = builder.fileSize;
+ }
+
+ public long getCurrentPosition() {
+ return currentPosition;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public String getPeerId() {
+ return peerId;
+ }
+
+ public String getWalGroup() {
+ return walGroup;
+ }
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public long getAgeOfLastShippedOp() {
+ return ageOfLastShippedOp;
+ }
+
+ public long getReplicationDelay() {
+ return replicationDelay;
+ }
+
+ public Path getCurrentPath() {
+ return currentPath;
+ }
+
+ public static ReplicationStatusBuilder newBuilder() {
+ return new ReplicationStatusBuilder();
+ }
+
+ public static class ReplicationStatusBuilder {
+ private String peerId = "UNKNOWN";
+ private String walGroup = "UNKNOWN";
+ private Path currentPath = new Path("UNKNOWN");
+ private int queueSize = -1;
+ private long ageOfLastShippedOp = -1;
+ private long replicationDelay = -1;
+ private long currentPosition = -1;
+ private long fileSize = -1;
+
+ public ReplicationStatusBuilder withPeerId(String peerId) {
+ this.peerId = peerId;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withWalGroup(String walGroup) {
+ this.walGroup = walGroup;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withCurrentPath(Path currentPath) {
+ this.currentPath = currentPath;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withQueueSize(int queueSize) {
+ this.queueSize = queueSize;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) {
+ this.ageOfLastShippedOp = ageOfLastShippedOp;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) {
+ this.replicationDelay = replicationDelay;
+ return this;
+ }
+
+ public ReplicationStatusBuilder withCurrentPosition(long currentPosition) {
+ this.currentPosition = currentPosition;
+ return this;
+ }
+
+ public ReplicationStatus build() {
+ return new ReplicationStatus(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66ad9fde/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
new file mode 100644
index 0000000..8ff4d84
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationMetricsforUI extends TestReplicationBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationMetricsforUI.class);
+ private static final byte[] qualName = Bytes.toBytes("q");
+
+ @Test
+ public void testReplicationMetrics() throws Exception {
+ try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+ Put p = new Put(Bytes.toBytes("starter"));
+ p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+ htable1.put(p);
+ // make sure replication done
+ while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) {
+ Thread.sleep(500);
+ }
+ // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+ Thread.sleep(5000);
+ HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
+ Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
+ Assert.assertEquals("metric size ", 1, metrics.size());
+ long lastPosition = 0;
+ for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+ Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+ Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize());
+ Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+ Assert.assertTrue("current position >= 0", metric.getValue().getCurrentPosition() >= 0);
+ lastPosition = metric.getValue().getCurrentPosition();
+ }
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p = new Put(Bytes.toBytes("" + Integer.toString(i)));
+ p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i));
+ htable1.put(p);
+ }
+ while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1))))
+ .size() == 0) {
+ Thread.sleep(500);
+ }
+ rs = utility1.getRSForFirstRegionInTable(tableName);
+ metrics = rs.getWalGroupsReplicationStatus();
+ Path lastPath = null;
+ for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+ lastPath = metric.getValue().getCurrentPath();
+ Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
+ Assert.assertTrue("age of Last Shipped Op should be > 0 ",
+ metric.getValue().getAgeOfLastShippedOp() > 0);
+ Assert.assertTrue("current position should > last position",
+ metric.getValue().getCurrentPosition() - lastPosition > 0);
+ lastPosition = metric.getValue().getCurrentPosition();
+ }
+
+ hbaseAdmin.rollWALWriter(rs.getServerName());
+ p = new Put(Bytes.toBytes("trigger"));
+ p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
+ htable1.put(p);
+ // make sure replication rolled to a new log
+ while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) {
+ Thread.sleep(500);
+ }
+ // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
+ Thread.sleep(5000);
+ metrics = rs.getWalGroupsReplicationStatus();
+ for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
+ Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay());
+ Assert.assertTrue("current position should < last position",
+ metric.getValue().getCurrentPosition() < lastPosition);
+ Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath());
+ }
+ }
+ }
+}
\ No newline at end of file