You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/10/19 21:54:40 UTC
hbase git commit: HBASE-16870 Add the metrics of replication sources
which were transformed from other dead rs to ReplicationLoad (Guanghao Zhang)
Repository: hbase
Updated Branches:
refs/heads/master 72db95388 -> 674511875
HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad (Guanghao Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67451187
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67451187
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67451187
Branch: refs/heads/master
Commit: 674511875d513ca3c031e63987288c45dacf56d9
Parents: 72db953
Author: tedyu <yu...@gmail.com>
Authored: Wed Oct 19 14:54:35 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Wed Oct 19 14:54:35 2016 -0700
----------------------------------------------------------------------
.../replication/regionserver/MetricsSource.java | 2 +-
.../replication/regionserver/Replication.java | 13 +++++++--
.../regionserver/ReplicationLoad.java | 21 +++++++++++---
.../regionserver/ReplicationSourceManager.java | 6 ++++
.../replication/TestReplicationSmallTests.java | 30 +++++++++++++++++---
5 files changed, 61 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index a647d03..68f32f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -82,7 +82,7 @@ public class MetricsSource implements BaseSource {
public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
- globalSourceSource.setLastShippedAge(age);
+ globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge()));
this.lastTimeStamps.put(walGroup, timestamp);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 8bf6c95..5f87690 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -376,15 +376,24 @@ public class Replication extends WALActionsListener.Base implements
}
private void buildReplicationLoad() {
- // get source
- List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
+ // get source
+ List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
for (ReplicationSourceInterface source : sources) {
if (source instanceof ReplicationSource) {
sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
}
}
+
+ // get old source
+ List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
+ for (ReplicationSourceInterface source : oldSources) {
+ if (source instanceof ReplicationSource) {
+ sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
+ }
+ }
+
// get sink
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 14f3fce..5772dd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
+import java.util.Map;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
@@ -66,8 +68,10 @@ public class ReplicationLoad {
this.replicationLoadSink = rLoadSinkBuild.build();
// build the SourceLoad List
- this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>();
+ Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap =
+ new HashMap<String, ClusterStatusProtos.ReplicationLoadSource>();
for (MetricsSource sm : this.sourceMetricsList) {
+ String peerId = sm.getPeerID();
long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
int sizeOfLogQueue = sm.getSizeOfLogQueue();
long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
@@ -85,17 +89,26 @@ public class ReplicationLoad {
replicationLag = 0;
}
+ ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
+ if (rLoadSource != null) {
+ ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp);
+ sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
+ timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
+ timeStampOfLastShippedOp);
+ replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag);
+ }
ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
ClusterStatusProtos.ReplicationLoadSource.newBuilder();
- rLoadSourceBuild.setPeerID(sm.getPeerID());
+ rLoadSourceBuild.setPeerID(peerId);
rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
rLoadSourceBuild.setReplicationLag(replicationLag);
- this.replicationLoadSourceList.add(rLoadSourceBuild.build());
+ replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build());
}
-
+ this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>(
+ replicationLoadSourceMap.values());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a6f1891..b2f3b8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -530,6 +530,9 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+ if (src instanceof ReplicationSource) {
+ ((ReplicationSource) src).getSourceMetrics().clear();
+ }
this.oldsources.remove(src);
deleteSource(src.getPeerClusterZnode(), false);
this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
@@ -579,6 +582,9 @@ public class ReplicationSourceManager implements ReplicationListener {
}
for (ReplicationSourceInterface toRemove : srcToRemove) {
toRemove.terminate(terminateMessage);
+ if (toRemove instanceof ReplicationSource) {
+ ((ReplicationSource) toRemove).getSourceMetrics().clear();
+ }
this.sources.remove(toRemove);
}
deleteSource(id, true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index a199d4a..f915dd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -719,7 +719,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testReplicationStatus() throws Exception {
LOG.info("testReplicationStatus");
- try (Admin admin = utility1.getConnection().getAdmin()) {
+ try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+ // Wait roll log request in setUp() to finish
+ Thread.sleep(5000);
+
+ // disable peer
+ admin.disablePeer(PEER_ID);
final byte[] qualName = Bytes.toBytes("q");
Put p;
@@ -730,7 +735,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
htable1.put(p);
}
- ClusterStatus status = admin.getClusterStatus();
+ ClusterStatus status = hbaseAdmin.getClusterStatus();
+ long globalSizeOfLogQueue = 0;
for (JVMClusterUtil.RegionServerThread thread :
utility1.getHBaseCluster().getRegionServerThreads()) {
@@ -739,8 +745,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
- // check SourceList has at least one entry
- assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
+ // check SourceList only has one entry
+ assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
+ globalSizeOfLogQueue += rLoadSourceList.get(0).getSizeOfLogQueue();
// check Sink exist only as it is difficult to verify the value on the fly
assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
@@ -748,6 +755,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
(rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
}
+
+ // Stop one rs
+ utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
+ Thread.sleep(5000);
+ status = hbaseAdmin.getClusterStatus();
+ ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ ServerLoad sl = status.getLoad(server);
+ List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
+ // check SourceList only has one entry
+ assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
+ // Another rs has one queue and one recovery queue from died rs
+ assertEquals(globalSizeOfLogQueue, rLoadSourceList.get(0).getSizeOfLogQueue());
+ } finally {
+ utility1.getHBaseCluster().getRegionServer(1).start();
+ admin.enablePeer(PEER_ID);
}
}