You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/10/19 21:54:40 UTC

hbase git commit: HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad (Guanghao Zhang)

Repository: hbase
Updated Branches:
  refs/heads/master 72db95388 -> 674511875


HBASE-16870 Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/67451187
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/67451187
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/67451187

Branch: refs/heads/master
Commit: 674511875d513ca3c031e63987288c45dacf56d9
Parents: 72db953
Author: tedyu <yu...@gmail.com>
Authored: Wed Oct 19 14:54:35 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Wed Oct 19 14:54:35 2016 -0700

----------------------------------------------------------------------
 .../replication/regionserver/MetricsSource.java |  2 +-
 .../replication/regionserver/Replication.java   | 13 +++++++--
 .../regionserver/ReplicationLoad.java           | 21 +++++++++++---
 .../regionserver/ReplicationSourceManager.java  |  6 ++++
 .../replication/TestReplicationSmallTests.java  | 30 +++++++++++++++++---
 5 files changed, 61 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index a647d03..68f32f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -82,7 +82,7 @@ public class MetricsSource implements BaseSource {
   public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
     long age = EnvironmentEdgeManager.currentTime() - timestamp;
     singleSourceSource.setLastShippedAge(age);
-    globalSourceSource.setLastShippedAge(age);
+    globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge()));
     this.lastTimeStamps.put(walGroup, timestamp);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 8bf6c95..5f87690 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -376,15 +376,24 @@ public class Replication extends WALActionsListener.Base implements
   }
 
   private void buildReplicationLoad() {
-    // get source
-    List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
 
+    // get source
+    List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
     for (ReplicationSourceInterface source : sources) {
       if (source instanceof ReplicationSource) {
         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
       }
     }
+
+    // get old source
+    List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
+    for (ReplicationSourceInterface source : oldSources) {
+      if (source instanceof ReplicationSource) {
+        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
+      }
+    }
+
     // get sink
     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);

http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index 14f3fce..5772dd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
@@ -66,8 +68,10 @@ public class ReplicationLoad {
     this.replicationLoadSink = rLoadSinkBuild.build();
 
     // build the SourceLoad List
-    this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>();
+    Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap =
+        new HashMap<String, ClusterStatusProtos.ReplicationLoadSource>();
     for (MetricsSource sm : this.sourceMetricsList) {
+      String peerId = sm.getPeerID();
       long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
       int sizeOfLogQueue = sm.getSizeOfLogQueue();
       long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp();
@@ -85,17 +89,26 @@ public class ReplicationLoad {
         replicationLag = 0;
       }
 
+      ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
+      if (rLoadSource != null) {
+        ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp);
+        sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
+        timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
+          timeStampOfLastShippedOp);
+        replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag);
+      }
       ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
           ClusterStatusProtos.ReplicationLoadSource.newBuilder();
-      rLoadSourceBuild.setPeerID(sm.getPeerID());
+      rLoadSourceBuild.setPeerID(peerId);
       rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
       rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
       rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
       rLoadSourceBuild.setReplicationLag(replicationLag);
 
-      this.replicationLoadSourceList.add(rLoadSourceBuild.build());
+      replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build());
     }
-
+    this.replicationLoadSourceList = new ArrayList<ClusterStatusProtos.ReplicationLoadSource>(
+        replicationLoadSourceMap.values());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a6f1891..b2f3b8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -530,6 +530,9 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public void closeRecoveredQueue(ReplicationSourceInterface src) {
     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
     this.oldsources.remove(src);
     deleteSource(src.getPeerClusterZnode(), false);
     this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
@@ -579,6 +582,9 @@ public class ReplicationSourceManager implements ReplicationListener {
       }
       for (ReplicationSourceInterface toRemove : srcToRemove) {
         toRemove.terminate(terminateMessage);
+        if (toRemove instanceof ReplicationSource) {
+          ((ReplicationSource) toRemove).getSourceMetrics().clear();
+        }
         this.sources.remove(toRemove);
       }
       deleteSource(id, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/67451187/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index a199d4a..f915dd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -719,7 +719,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   public void testReplicationStatus() throws Exception {
     LOG.info("testReplicationStatus");
 
-    try (Admin admin = utility1.getConnection().getAdmin()) {
+    try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+      // Wait roll log request in setUp() to finish
+      Thread.sleep(5000);
+
+      // disable peer
+      admin.disablePeer(PEER_ID);
 
       final byte[] qualName = Bytes.toBytes("q");
       Put p;
@@ -730,7 +735,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         htable1.put(p);
       }
 
-      ClusterStatus status = admin.getClusterStatus();
+      ClusterStatus status = hbaseAdmin.getClusterStatus();
+      long globalSizeOfLogQueue = 0;
 
       for (JVMClusterUtil.RegionServerThread thread :
           utility1.getHBaseCluster().getRegionServerThreads()) {
@@ -739,8 +745,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
         ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
 
-        // check SourceList has at least one entry
-        assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() > 0));
+        // check SourceList only has one entry
+        assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
+        globalSizeOfLogQueue += rLoadSourceList.get(0).getSizeOfLogQueue();
 
         // check Sink exist only as it is difficult to verify the value on the fly
         assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
@@ -748,6 +755,21 @@ public class TestReplicationSmallTests extends TestReplicationBase {
         assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
           (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0));
       }
+
+      // Stop one rs
+      utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
+      Thread.sleep(5000);
+      status = hbaseAdmin.getClusterStatus();
+      ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+      ServerLoad sl = status.getLoad(server);
+      List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
+      // check SourceList only has one entry
+      assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
+      // Another rs has one queue and one recovery queue from died rs
+      assertEquals(globalSizeOfLogQueue, rLoadSourceList.get(0).getSizeOfLogQueue());
+    } finally {
+      utility1.getHBaseCluster().getRegionServer(1).start();
+      admin.enablePeer(PEER_ID);
     }
   }