You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/08/24 01:02:07 UTC

[1/3] hbase git commit: HBASE-16448 Custom metrics for custom replication endpoints

Repository: hbase
Updated Branches:
  refs/heads/0.98 ce58f5890 -> aac4e0951
  refs/heads/branch-1 1e15fa57d -> 6e9b49cac
  refs/heads/master 91fee265c -> cb02be38a


HBASE-16448 Custom metrics for custom replication endpoints

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb02be38
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb02be38
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb02be38

Branch: refs/heads/master
Commit: cb02be38ab20ec5343fc9a7450bed33461e38f10
Parents: 91fee26
Author: Geoffrey <gj...@salesforce.com>
Authored: Thu Aug 18 14:24:10 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:17:08 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         |  4 +-
 .../MetricsReplicationGlobalSourceSource.java   | 56 ++++++++++++++
 .../MetricsReplicationSourceSourceImpl.java     | 80 +++++++++++++++++---
 .../replication/regionserver/MetricsSource.java | 78 ++++++++++++++++++-
 .../replication/TestReplicationEndpoint.java    | 67 +++++++++++++++-
 5 files changed, 269 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..c877608 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..d595ca9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..5a6a103 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  private String keyPrefix;
+
   @Deprecated
   private final String shippedKBsKey;
   private final String shippedBytesKey;
@@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
     this.id = id;
+    this.keyPrefix = "source." + this.id + ".";
 
-    ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
 
-    sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+    sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
 
-    shippedBatchesKey = "source." + this.id + ".shippedBatches";
+    shippedBatchesKey = this.keyPrefix + "shippedBatches";
     shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
 
-    shippedOpsKey = "source." + this.id + ".shippedOps";
+    shippedOpsKey = this.keyPrefix + "shippedOps";
     shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
 
-    shippedKBsKey = "source." + this.id + ".shippedKBs";
+    shippedKBsKey = this.keyPrefix + "shippedKBs";
     shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
 
-    shippedBytesKey = "source." + this.id + ".shippedBytes";
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
     shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
 
-    logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+    logReadInBytesKey = this.keyPrefix + "logReadInBytes";
     logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
 
-    logReadInEditsKey = "source." + id + ".logEditsRead";
+    logReadInEditsKey = this.keyPrefix + "logEditsRead";
     logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
 
-    logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+    logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
 
-    shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+    shippedHFilesKey = this.keyPrefix + "shippedHFiles";
     shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
 
-    sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+    sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
     sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
   }
 
@@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..7dfeff6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * through the metrics interfaces.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
 
   private static final Log LOG = LogFactory.getLog(MetricsSource.class);
 
@@ -61,6 +62,19 @@ public class MetricsSource {
   }
 
   /**
+   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+   * @param id Name of the source this class is monitoring
+   * @param singleSourceSource Class to monitor id-scoped metrics
+   * @param globalSourceSource Class to monitor global-scoped metrics
+   */
+  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+                       MetricsReplicationSourceSource globalSourceSource) {
+    this.id = id;
+    this.singleSourceSource = singleSourceSource;
+    this.globalSourceSource = globalSourceSource;
+  }
+
+  /**
    * Set the age of the last edit that was shipped
    * @param timestamp write time of the edit
    * @param walGroup which group we are setting
@@ -227,4 +241,66 @@ public class MetricsSource {
       lastHFileRefsQueueSize = 0;
     }
   }
+
+  @Override
+  public void init() {
+    singleSourceSource.init();
+    globalSourceSource.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    singleSourceSource.setGauge(gaugeName, value);
+    globalSourceSource.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    singleSourceSource.incGauge(gaugeName, delta);
+    globalSourceSource.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    singleSourceSource.decGauge(gaugeName, delta);
+    globalSourceSource.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    singleSourceSource.removeMetric(key);
+    globalSourceSource.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    singleSourceSource.incCounters(counterName, delta);
+    globalSourceSource.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    singleSourceSource.updateHistogram(name, value);
+    globalSourceSource.updateHistogram(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return globalSourceSource.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return globalSourceSource.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return globalSourceSource.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return globalSourceSource.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index a5a4e73..002b8c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.*;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,6 +52,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests ReplicationSource and ReplicationEndpoint interactions
  */
@@ -115,8 +120,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -255,6 +260,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
 
+  @Test
+  public void testMetricsSourceBaseSourcePassthrough(){
+    /*
+    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+    allows for custom JMX metrics.
+    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+    the two layers of wrapping to the actual BaseSource.
+    */
+    String id = "id";
+    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+    String gaugeName = "gauge";
+    String singleGaugeName = "source.id." + gaugeName;
+    long delta = 1;
+    String counterName = "counter";
+    String singleCounterName = "source.id." + counterName;
+    long count = 2;
+    source.decGauge(gaugeName, delta);
+    source.getMetricsContext();
+    source.getMetricsDescription();
+    source.getMetricsJmxContext();
+    source.getMetricsName();
+    source.incCounters(counterName, count);
+    source.incGauge(gaugeName, delta);
+    source.init();
+    source.removeMetric(gaugeName);
+    source.setGauge(gaugeName, delta);
+    source.updateHistogram(counterName, count);
+
+    verify(singleRms).decGauge(singleGaugeName, delta);
+    verify(globalRms).decGauge(gaugeName, delta);
+    verify(globalRms).getMetricsContext();
+    verify(globalRms).getMetricsJmxContext();
+    verify(globalRms).getMetricsName();
+    verify(singleRms).incCounters(singleCounterName, count);
+    verify(globalRms).incCounters(counterName, count);
+    verify(singleRms).incGauge(singleGaugeName, delta);
+    verify(globalRms).incGauge(gaugeName, delta);
+    verify(globalRms).init();
+    verify(singleRms).removeMetric(singleGaugeName);
+    verify(globalRms).removeMetric(gaugeName);
+    verify(singleRms).setGauge(singleGaugeName, delta);
+    verify(globalRms).setGauge(gaugeName, delta);
+    verify(singleRms).updateHistogram(singleCounterName, count);
+    verify(globalRms).updateHistogram(counterName, count);
+  }
+
   private void doPut(byte[] row) throws IOException {
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
       doPut(connection, row);


[2/3] hbase git commit: HBASE-16448 Custom metrics for custom replication endpoints

Posted by ap...@apache.org.
HBASE-16448 Custom metrics for custom replication endpoints

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e9b49ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e9b49ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e9b49ca

Branch: refs/heads/branch-1
Commit: 6e9b49cac7b118732659eef0cebb804be3e16238
Parents: 1e15fa5
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Aug 23 14:42:07 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:17:42 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         |  4 +-
 .../MetricsReplicationGlobalSourceSource.java   | 56 ++++++++++++++
 .../MetricsReplicationSourceSourceImpl.java     | 80 +++++++++++++++++---
 .../replication/regionserver/MetricsSource.java | 78 ++++++++++++++++++-
 .../replication/TestReplicationEndpoint.java    | 67 +++++++++++++++-
 5 files changed, 269 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..c877608 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..d595ca9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..5a6a103 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  private String keyPrefix;
+
   @Deprecated
   private final String shippedKBsKey;
   private final String shippedBytesKey;
@@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
     this.id = id;
+    this.keyPrefix = "source." + this.id + ".";
 
-    ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
 
-    sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+    sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
 
-    shippedBatchesKey = "source." + this.id + ".shippedBatches";
+    shippedBatchesKey = this.keyPrefix + "shippedBatches";
     shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
 
-    shippedOpsKey = "source." + this.id + ".shippedOps";
+    shippedOpsKey = this.keyPrefix + "shippedOps";
     shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
 
-    shippedKBsKey = "source." + this.id + ".shippedKBs";
+    shippedKBsKey = this.keyPrefix + "shippedKBs";
     shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
 
-    shippedBytesKey = "source." + this.id + ".shippedBytes";
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
     shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
 
-    logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+    logReadInBytesKey = this.keyPrefix + "logReadInBytes";
     logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
 
-    logReadInEditsKey = "source." + id + ".logEditsRead";
+    logReadInEditsKey = this.keyPrefix + "logEditsRead";
     logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
 
-    logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+    logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
 
-    shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+    shippedHFilesKey = this.keyPrefix + "shippedHFiles";
     shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
 
-    sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+    sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
     sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
   }
 
@@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public int getSizeOfLogQueue() {
     return (int)sizeOfLogQueueGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..7dfeff6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * through the metrics interfaces.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
 
   private static final Log LOG = LogFactory.getLog(MetricsSource.class);
 
@@ -61,6 +62,19 @@ public class MetricsSource {
   }
 
   /**
+   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+   * @param id Name of the source this class is monitoring
+   * @param singleSourceSource Class to monitor id-scoped metrics
+   * @param globalSourceSource Class to monitor global-scoped metrics
+   */
+  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+                       MetricsReplicationSourceSource globalSourceSource) {
+    this.id = id;
+    this.singleSourceSource = singleSourceSource;
+    this.globalSourceSource = globalSourceSource;
+  }
+
+  /**
    * Set the age of the last edit that was shipped
    * @param timestamp write time of the edit
    * @param walGroup which group we are setting
@@ -227,4 +241,66 @@ public class MetricsSource {
       lastHFileRefsQueueSize = 0;
     }
   }
+
+  @Override
+  public void init() {
+    singleSourceSource.init();
+    globalSourceSource.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    singleSourceSource.setGauge(gaugeName, value);
+    globalSourceSource.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    singleSourceSource.incGauge(gaugeName, delta);
+    globalSourceSource.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    singleSourceSource.decGauge(gaugeName, delta);
+    globalSourceSource.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    singleSourceSource.removeMetric(key);
+    globalSourceSource.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    singleSourceSource.incCounters(counterName, delta);
+    globalSourceSource.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    singleSourceSource.updateHistogram(name, value);
+    globalSourceSource.updateHistogram(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return globalSourceSource.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return globalSourceSource.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return globalSourceSource.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return globalSourceSource.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index d570549..5a54314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -38,11 +38,12 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.*;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +51,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests ReplicationSource and ReplicationEndpoint interactions
  */
@@ -114,8 +119,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -254,6 +259,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   }
 
 
+  @Test
+  public void testMetricsSourceBaseSourcePassthrough(){
+    /*
+    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+    allows for custom JMX metrics.
+    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+    the two layers of wrapping to the actual BaseSource.
+    */
+    String id = "id";
+    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+    String gaugeName = "gauge";
+    String singleGaugeName = "source.id." + gaugeName;
+    long delta = 1;
+    String counterName = "counter";
+    String singleCounterName = "source.id." + counterName;
+    long count = 2;
+    source.decGauge(gaugeName, delta);
+    source.getMetricsContext();
+    source.getMetricsDescription();
+    source.getMetricsJmxContext();
+    source.getMetricsName();
+    source.incCounters(counterName, count);
+    source.incGauge(gaugeName, delta);
+    source.init();
+    source.removeMetric(gaugeName);
+    source.setGauge(gaugeName, delta);
+    source.updateHistogram(counterName, count);
+
+    verify(singleRms).decGauge(singleGaugeName, delta);
+    verify(globalRms).decGauge(gaugeName, delta);
+    verify(globalRms).getMetricsContext();
+    verify(globalRms).getMetricsJmxContext();
+    verify(globalRms).getMetricsName();
+    verify(singleRms).incCounters(singleCounterName, count);
+    verify(globalRms).incCounters(counterName, count);
+    verify(singleRms).incGauge(singleGaugeName, delta);
+    verify(globalRms).incGauge(gaugeName, delta);
+    verify(globalRms).init();
+    verify(singleRms).removeMetric(singleGaugeName);
+    verify(globalRms).removeMetric(gaugeName);
+    verify(singleRms).setGauge(singleGaugeName, delta);
+    verify(globalRms).setGauge(gaugeName, delta);
+    verify(singleRms).updateHistogram(singleCounterName, count);
+    verify(globalRms).updateHistogram(counterName, count);
+  }
+
   private void doPut(byte[] row) throws IOException {
     try (Connection connection = ConnectionFactory.createConnection(conf1)) {
       doPut(connection, row);


[3/3] hbase git commit: HBASE-16448 Custom metrics for custom replication endpoints

Posted by ap...@apache.org.
HBASE-16448 Custom metrics for custom replication endpoints

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aac4e095
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aac4e095
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aac4e095

Branch: refs/heads/0.98
Commit: aac4e09514b8b7dcf9d253d5722d0fe813973b99
Parents: ce58f58
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Aug 23 16:49:24 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:18:00 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         |  4 +-
 .../MetricsReplicationGlobalSourceSource.java   | 64 ++++++++++++++-
 .../MetricsReplicationSourceSourceImpl.java     | 81 ++++++++++++++++---
 .../replication/regionserver/MetricsSource.java | 85 +++++++++++++++++++-
 .../replication/TestReplicationEndpoint.java    | 76 ++++++++++++++++-
 5 files changed, 293 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 8611e15..ea0ae20 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hbase.replication.regionserver;
 
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
 
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index b3e1766..da1bcf4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource {
-
+  private final MetricsReplicationSource rms;
   private final MutableGaugeLong ageOfLastShippedOpGauge;
   private final MutableGaugeLong sizeOfLogQueueGauge;
   private final MutableCounterLong logReadInEditsCounter;
@@ -35,7 +35,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableCounterLong logReadInBytesCounter;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
-
+    this.rms = rms;
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, 0L);
 
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
@@ -118,4 +118,64 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(name, value);
+  }
+
+  @Override
+  public void updateQuantile(String name, long value) {
+    rms.updateQuantile(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index d6a9128..55c9b05 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -31,6 +31,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  private String keyPrefix;
+
   @Deprecated
   private final String shippedKBsKey;
   private final String shippedBytesKey;
@@ -49,32 +51,33 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
     this.id = id;
+    this.keyPrefix = "source." + this.id + ".";
 
-    ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+    ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
     ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(ageOfLastShippedOpKey, 0L);
 
-    sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+    sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
     sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfLogQueueKey, 0L);
 
-    shippedBatchesKey = "source." + this.id + ".shippedBatches";
+    shippedBatchesKey = this.keyPrefix + "shippedBatches";
     shippedBatchesCounter = rms.getMetricsRegistry().getLongCounter(shippedBatchesKey, 0L);
 
-    shippedOpsKey = "source." + this.id + ".shippedOps";
+    shippedOpsKey = this.keyPrefix + "shippedOps";
     shippedOpsCounter = rms.getMetricsRegistry().getLongCounter(shippedOpsKey, 0L);
 
-    shippedKBsKey = "source." + this.id + ".shippedKBs";
+    shippedKBsKey = this.keyPrefix + "shippedKBs";
     shippedKBsCounter = rms.getMetricsRegistry().getLongCounter(shippedKBsKey, 0L);
 
-    shippedBytesKey = "source." + this.id + ".shippedBytes";
+    shippedBytesKey = this.keyPrefix + "shippedBytes";
     shippedBytesCounter = rms.getMetricsRegistry().getLongCounter(shippedBytesKey, 0L);
 
-    logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+    logReadInBytesKey = this.keyPrefix + "logReadInBytes";
     logReadInBytesCounter = rms.getMetricsRegistry().getLongCounter(logReadInBytesKey, 0L);
 
-    logReadInEditsKey = "source." + id + ".logEditsRead";
+    logReadInEditsKey = this.keyPrefix + "logEditsRead";
     logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(logReadInEditsKey, 0L);
 
-    logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+    logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
   }
 
@@ -139,4 +142,64 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override
+  public void init() {
+    rms.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    rms.setGauge(this.keyPrefix + gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    rms.incGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    rms.decGauge(this.keyPrefix + gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    rms.removeMetric(this.keyPrefix + key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    rms.incCounters(this.keyPrefix + counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    rms.updateHistogram(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public void updateQuantile(String name, long value) {
+    rms.updateQuantile(this.keyPrefix + name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return rms.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return rms.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return rms.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return rms.getMetricsName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 113bc5b..5824d83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * through the metrics interfaces.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
 
   public static final Log LOG = LogFactory.getLog(MetricsSource.class);
 
@@ -74,6 +75,19 @@ public class MetricsSource {
   }
 
   /**
+   * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+   * @param id Name of the source this class is monitoring
+   * @param singleSourceSource Class to monitor id-scoped metrics
+   * @param globalSourceSource Class to monitor global-scoped metrics
+   */
+  public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+                       MetricsReplicationSourceSource globalSourceSource) {
+    this.id = id;
+    this.singleSourceSource = singleSourceSource;
+    this.globalSourceSource = globalSourceSource;
+  }
+
+  /**
    * Set the age of the last edit that was shipped
    *
    * @param timestamp write time of the edit
@@ -196,4 +210,73 @@ public class MetricsSource {
   public String getPeerID() {
     return id;
   }
+  
+  @Override
+  public void init() {
+    singleSourceSource.init();
+    globalSourceSource.init();
+  }
+
+  @Override
+  public void setGauge(String gaugeName, long value) {
+    singleSourceSource.setGauge(gaugeName, value);
+    globalSourceSource.setGauge(gaugeName, value);
+  }
+
+  @Override
+  public void incGauge(String gaugeName, long delta) {
+    singleSourceSource.incGauge(gaugeName, delta);
+    globalSourceSource.incGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void decGauge(String gaugeName, long delta) {
+    singleSourceSource.decGauge(gaugeName, delta);
+    globalSourceSource.decGauge(gaugeName, delta);
+  }
+
+  @Override
+  public void removeMetric(String key) {
+    singleSourceSource.removeMetric(key);
+    globalSourceSource.removeMetric(key);
+  }
+
+  @Override
+  public void incCounters(String counterName, long delta) {
+    singleSourceSource.incCounters(counterName, delta);
+    globalSourceSource.incCounters(counterName, delta);
+  }
+
+  @Override
+  public void updateHistogram(String name, long value) {
+    singleSourceSource.updateHistogram(name, value);
+    globalSourceSource.updateHistogram(name, value);
+  }
+
+  @Override
+  public void updateQuantile(String name, long value) {
+    singleSourceSource.updateQuantile(name, value);
+    globalSourceSource.updateQuantile(name, value);
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return globalSourceSource.getMetricsContext();
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return globalSourceSource.getMetricsDescription();
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return globalSourceSource.getMetricsJmxContext();
+  }
+
+  @Override
+  public String getMetricsName() {
+    return globalSourceSource.getMetricsName();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index ae4c8e3..7d7df5c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.regionserver.*;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -44,6 +47,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests ReplicationSource and ReplicationEndpoint interactions
  */
@@ -82,8 +89,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
   public void testCustomReplicationEndpoint() throws Exception {
     // test installing a custom replication endpoint other than the default one.
     admin.addPeer("testCustomReplicationEndpoint",
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
 
     // check whether the class has been constructed and started
     Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -123,8 +130,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
     final String id = "testReplicationEndpointReturnsFalseOnReplicate";
     admin.addPeer(id,
-      new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
-        .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
+        new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+            .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
     // now replicate some data.
     doPut(row);
 
@@ -141,6 +148,66 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
   }
 
+  @Test
+  public void testMetricsSourceBaseSourcePassthrough(){
+    /*
+    The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+    and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+    Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+    allows for custom JMX metrics.
+    This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+    the two layers of wrapping to the actual BaseSource.
+    */
+    String id = "id";
+    DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+    MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+    when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+    MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+    when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+    MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+    MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+    MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+    String gaugeName = "gauge";
+    String singleGaugeName = "source.id." + gaugeName;
+    long delta = 1;
+    String counterName = "counter";
+    String singleCounterName = "source.id." + counterName;
+    long count = 2;
+    source.decGauge(gaugeName, delta);
+    source.getMetricsContext();
+    source.getMetricsDescription();
+    source.getMetricsJmxContext();
+    source.getMetricsName();
+    source.incCounters(counterName, count);
+    source.incGauge(gaugeName, delta);
+    source.init();
+    source.removeMetric(gaugeName);
+    source.setGauge(gaugeName, delta);
+    source.updateHistogram(counterName, count);
+    source.updateQuantile(counterName, count);
+
+    verify(singleRms).decGauge(singleGaugeName, delta);
+    verify(globalRms).decGauge(gaugeName, delta);
+    verify(globalRms).getMetricsContext();
+    verify(globalRms).getMetricsJmxContext();
+    verify(globalRms).getMetricsName();
+    verify(singleRms).incCounters(singleCounterName, count);
+    verify(globalRms).incCounters(counterName, count);
+    verify(singleRms).incGauge(singleGaugeName, delta);
+    verify(globalRms).incGauge(gaugeName, delta);
+    verify(globalRms).init();
+    verify(singleRms).removeMetric(singleGaugeName);
+    verify(globalRms).removeMetric(gaugeName);
+    verify(singleRms).setGauge(singleGaugeName, delta);
+    verify(globalRms).setGauge(gaugeName, delta);
+    verify(singleRms).updateHistogram(singleCounterName, count);
+    verify(globalRms).updateHistogram(counterName, count);
+    verify(singleRms).updateQuantile(singleCounterName, count);
+    verify(globalRms).updateQuantile(counterName, count);
+
+  }
+
   @Test (timeout=120000)
   public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
     admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
@@ -253,6 +320,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
       return true;
     }
 
+
     @Override
     public WALEntryFilter getWALEntryfilter() {
       return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {