You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/12/12 23:19:18 UTC

hbase git commit: HBASE-20858 Port HBASE-20695 (Implement table level RegionServer replication metrics) to branch-1

Repository: hbase
Updated Branches:
  refs/heads/branch-1.4 8c5d7ac2c -> c66d900d7


HBASE-20858 Port HBASE-20695 (Implement table level RegionServer replication metrics) to branch-1

Signed-off-by: Andrew Purtell <ap...@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c66d900d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c66d900d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c66d900d

Branch: refs/heads/branch-1.4
Commit: c66d900d79481b07581e5cd57ace456d626f4007
Parents: 8c5d7ac
Author: Xu Cang <xc...@salesforce.com>
Authored: Fri Jul 6 16:36:05 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 12 15:17:29 2018 -0800

----------------------------------------------------------------------
 .../replication/regionserver/MetricsSource.java | 28 +++++++++++++++++---
 .../regionserver/ReplicationSource.java         |  5 ++++
 .../replication/TestReplicationEndpoint.java    | 22 ++++++++++++++-
 3 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c66d900d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index c08b187..53e1074 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -46,7 +46,7 @@ public class MetricsSource implements BaseSource {
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
-
+  private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
 
   /**
    * Constructor used to register the metrics
@@ -58,7 +58,9 @@ public class MetricsSource implements BaseSource {
     singleSourceSource =
         CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
             .getSource(id);
-    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+    globalSourceSource = CompatibilitySingletonFactory
+        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+    singleSourceSourceByTable = new HashMap<>();
   }
 
   /**
@@ -68,10 +70,12 @@ public class MetricsSource implements BaseSource {
    * @param globalSourceSource Class to monitor global-scoped metrics
    */
   public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
-                       MetricsReplicationSourceSource globalSourceSource) {
+                       MetricsReplicationSourceSource globalSourceSource,
+                       Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
     this.id = id;
     this.singleSourceSource = singleSourceSource;
     this.globalSourceSource = globalSourceSource;
+    this.singleSourceSourceByTable = singleSourceSourceByTable;
   }
 
   /**
@@ -87,6 +91,20 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Set the age of the last edit that was shipped group by table
+   * @param timestamp write time of the edit
+   * @param tableName String as group and tableName
+   */
+  public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
+    long age = EnvironmentEdgeManager.currentTime() - timestamp;
+    if (!this.getSingleSourceSourceByTable().containsKey(tableName)) {
+      this.getSingleSourceSourceByTable().put(tableName,
+          CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+          .getSource(tableName));
+    }
+    this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
+  }
+  /**
    * Convenience method to use the last given timestamp to refresh the age of the last edit. Used
    * when replication fails and need to keep that metric accurate.
    * @param walGroupId id of the group to update
@@ -345,6 +363,10 @@ public class MetricsSource implements BaseSource {
     return globalSourceSource.getMetricsName();
   }
 
+  public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+    return singleSourceSourceByTable;
+  }
+
   @Override
   public MetricRegistryInfo getMetricRegistryInfo() {
     return new MetricRegistryInfo(getMetricsName(), getMetricsDescription(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/c66d900d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index b49f6bb..d3f2620 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -688,6 +688,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
             int size = entries.size();
             for (int i = 0; i < size; i++) {
               cleanUpHFileRefs(entries.get(i).getEdit());
+
+              TableName tableName = entries.get(i).getKey().getTablename();
+              source.getSourceMetrics().setAgeOfLastShippedOpByTable(
+                entries.get(i).getKey().getWriteTime(),
+                tableName.getNameAsString());
             }
             //Log and clean up WAL logs
             updateLogPosition(lastReadPosition);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c66d900d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index f2a5d58..68af95d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -309,7 +311,9 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
     MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
     doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
-    MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource);
+    Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
+    MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
+        singleSourceSourceByTable);
     String gaugeName = "gauge";
     String singleGaugeName = "source.id." + gaugeName;
     long delta = 1;
@@ -346,6 +350,22 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     verify(singleRms).updateHistogram(singleCounterName, count);
     verify(globalRms).updateHistogram(counterName, count);
     verify(spyglobalSourceSource).incrFailedRecoveryQueue();
+
+    // check singleSourceSourceByTable metrics.
+    // singleSourceSourceByTable map entry will be created only
+    // after calling #setAgeOfLastShippedOpByTable
+    boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
+        .containsKey("RandomNewTable");
+    Assert.assertEquals(false, containsRandomNewTable);
+    source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+    containsRandomNewTable = source.getSingleSourceSourceByTable()
+      .containsKey("RandomNewTable");
+    Assert.assertEquals(true, containsRandomNewTable);
+    MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+        .get("RandomNewTable");
+    // cannot put more concreate value here to verify because the age is arbitrary.
+    // as long as it's greater than 0, we see it as correct answer.
+    Assert.assertTrue(msr.getLastShippedAge() > 0);
   }
 
   private void doPut(byte[] row) throws IOException {