You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/08/24 01:02:08 UTC

[2/3] hbase git commit: HBASE-16448 Custom metrics for custom replication endpoints

HBASE-16448 Custom metrics for custom replication endpoints

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e9b49ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e9b49ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e9b49ca

Branch: refs/heads/branch-1
Commit: 6e9b49cac7b118732659eef0cebb804be3e16238
Parents: 1e15fa5
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Aug 23 14:42:07 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:17:42 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         |  4 +-
 .../MetricsReplicationGlobalSourceSource.java   | 56 ++++++++++++++
 .../MetricsReplicationSourceSourceImpl.java     | 80 +++++++++++++++++---
 .../replication/regionserver/MetricsSource.java | 78 ++++++++++++++++++-
 .../replication/TestReplicationEndpoint.java    | 67 +++++++++++++++-
 5 files changed, 269 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..c877608 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..d595ca9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..5a6a103 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  private String keyPrefix;
+
   @Deprecated
   private final String shippedKBsKey;
   private final String shippedBytesKey;
@@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
     this.id = id;
+    this.keyPrefix = "source." + this.id + ".";
 
-    ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
 
-    sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+    sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
 
-    shippedBatchesKey = "source." + this.id + ".shippedBatches";
+    shippedBatchesKey = this.keyPrefix + "shippedBatches";
     shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
 
-    shippedOpsKey = "source." + this.id + ".shippedOps";
+    shippedOpsKey = this.keyPrefix + "shippedOps";
     shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
 
-    shippedKBsKey = "source." + this.id + ".shippedKBs";
+    shippedKBsKey = this.keyPrefix + "shippedKBs";
     shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
 
-    shippedBytesKey = "source." + this.id + ".shippedBytes";
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
     shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
 
-    logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+    logReadInBytesKey = this.keyPrefix + "logReadInBytes";
     logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
 
-    logReadInEditsKey = "source." + id + ".logEditsRead";
+    logReadInEditsKey = this.keyPrefix + "logEditsRead";
     logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
 
-    logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+    logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
 
-    shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+    shippedHFilesKey = this.keyPrefix + "shippedHFiles";
     shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
 
-    sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+    sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
     sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
   }
 
@@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..7dfeff6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * through the metrics interfaces.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
 
   private static final Log LOG = LogFactory.getLog(MetricsSource.class);
 
@@ -61,6 +62,19 @@ public class MetricsSource {
   }
 
   /**
+   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+   * @param id Name of the source this class is monitoring
+   * @param singleSourceSource Class to monitor id-scoped metrics
+   * @param globalSourceSource Class to monitor global-scoped metrics
+   */
+  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+                       MetricsReplicationSourceSource globalSourceSource) {
+    this.id = id;
+    this.singleSourceSource = singleSourceSource;
+    this.globalSourceSource = globalSourceSource;
+  }
+
+  /**
    * Set the age of the last edit that was shipped
    * @param timestamp write time of the edit
    * @param walGroup which group we are setting
@@ -227,4 +241,66 @@ public class MetricsSource {
       lastHFileRefsQueueSize = 0;
     }
   }
+
+  @Override
+  public void init() {
+    singleSourceSource.init();
+    globalSourceSource.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    singleSourceSource.setGauge(gaugeName, value);
+    globalSourceSource.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    singleSourceSource.incGauge(gaugeName, delta);
+    globalSourceSource.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    singleSourceSource.decGauge(gaugeName, delta);
+    globalSourceSource.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    singleSourceSource.removeMetric(key);
+    globalSourceSource.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    singleSourceSource.incCounters(counterName, delta);
+    globalSourceSource.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    singleSourceSource.updateHistogram(name, value);
+    globalSourceSource.updateHistogram(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return globalSourceSource.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return globalSourceSource.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return globalSourceSource.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return globalSourceSource.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index d570549..5a54314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -38,11 +38,12 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +51,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests ReplicationSource and ReplicationEndpoint interactions
  */
@@ -114,8 +119,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -254,6 +259,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
 
+  @Test
+  public void testMetricsSourceBaseSourcePassthrough(){
+    /*
+    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+    allows for custom JMX metrics.
+    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+    the two layers of wrapping to the actual BaseSource.
+    */
+    String id = "id";
+    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+    String gaugeName = "gauge";
+    String singleGaugeName = "source.id." + gaugeName;
+    long delta = 1;
+    String counterName = "counter";
+    String singleCounterName = "source.id." + counterName;
+    long count = 2;
+    source.decGauge(gaugeName, delta);
+    source.getMetricsContext();
+    source.getMetricsDescription();
+    source.getMetricsJmxContext();
+    source.getMetricsName();
+    source.incCounters(counterName, count);
+    source.incGauge(gaugeName, delta);
+    source.init();
+    source.removeMetric(gaugeName);
+    source.setGauge(gaugeName, delta);
+    source.updateHistogram(counterName, count);
+
+    verify(singleRms).decGauge(singleGaugeName, delta);
+    verify(globalRms).decGauge(gaugeName, delta);
+    verify(globalRms).getMetricsContext();
+    verify(globalRms).getMetricsJmxContext();
+    verify(globalRms).getMetricsName();
+    verify(singleRms).incCounters(singleCounterName, count);
+    verify(globalRms).incCounters(counterName, count);
+    verify(singleRms).incGauge(singleGaugeName, delta);
+    verify(globalRms).incGauge(gaugeName, delta);
+    verify(globalRms).init();
+    verify(singleRms).removeMetric(singleGaugeName);
+    verify(globalRms).removeMetric(gaugeName);
+    verify(singleRms).setGauge(singleGaugeName, delta);
+    verify(globalRms).setGauge(gaugeName, delta);
+    verify(singleRms).updateHistogram(singleCounterName, count);
+    verify(globalRms).updateHistogram(counterName, count);
+  }
+
   private void doPut(byte[] row) throws IOException {
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
       doPut(connection, row);