You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/08/24 01:02:07 UTC
[1/3] hbase git commit: HBASE-16448 Custom metrics for custom
replication endpoints
Repository: hbase
Updated Branches:
refs/heads/0.98 ce58f5890 -> aac4e0951
refs/heads/branch-1 1e15fa57d -> 6e9b49cac
refs/heads/master 91fee265c -> cb02be38a
HBASE-16448 Custom metrics for custom replication endpoints
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb02be38
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb02be38
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb02be38
Branch: refs/heads/master
Commit: cb02be38ab20ec5343fc9a7450bed33461e38f10
Parents: 91fee26
Author: Geoffrey <gj...@salesforce.com>
Authored: Thu Aug 18 14:24:10 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:17:08 2016 -0700
----------------------------------------------------------------------
.../MetricsReplicationSourceSource.java | 4 +-
.../MetricsReplicationGlobalSourceSource.java | 56 ++++++++++++++
.../MetricsReplicationSourceSourceImpl.java | 80 +++++++++++++++++---
.../replication/regionserver/MetricsSource.java | 78 ++++++++++++++++++-
.../replication/TestReplicationEndpoint.java | 67 +++++++++++++++-
5 files changed, 269 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..c877608 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..d595ca9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
+
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..5a6a103 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
+ private String keyPrefix;
+
@Deprecated
private final String shippedKBsKey;
private final String shippedBytesKey;
@@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
this.id = id;
+ this.keyPrefix = "source." + this.id + ".";
- ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+ ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
- sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+ sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
- shippedBatchesKey = "source." + this.id + ".shippedBatches";
+ shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
- shippedOpsKey = "source." + this.id + ".shippedOps";
+ shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
- shippedKBsKey = "source." + this.id + ".shippedKBs";
+ shippedKBsKey = this.keyPrefix + "shippedKBs";
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
- shippedBytesKey = "source." + this.id + ".shippedBytes";
+ shippedBytesKey = this.keyPrefix + "shippedBytes";
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
- logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+ logReadInBytesKey = this.keyPrefix + "logReadInBytes";
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
- logReadInEditsKey = "source." + id + ".logEditsRead";
+ logReadInEditsKey = this.keyPrefix + "logEditsRead";
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
- logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+ logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
- shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+ shippedHFilesKey = this.keyPrefix + "shippedHFiles";
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
- sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+ sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
}
@@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(this.keyPrefix + gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(this.keyPrefix + key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(this.keyPrefix + counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..7dfeff6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* through the metrics interfaces.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
@@ -61,6 +62,19 @@ public class MetricsSource {
}
/**
+ * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+ * @param id Name of the source this class is monitoring
+ * @param singleSourceSource Class to monitor id-scoped metrics
+ * @param globalSourceSource Class to monitor global-scoped metrics
+ */
+ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+ MetricsReplicationSourceSource globalSourceSource) {
+ this.id = id;
+ this.singleSourceSource = singleSourceSource;
+ this.globalSourceSource = globalSourceSource;
+ }
+
+ /**
* Set the age of the last edit that was shipped
* @param timestamp write time of the edit
* @param walGroup which group we are setting
@@ -227,4 +241,66 @@ public class MetricsSource {
lastHFileRefsQueueSize = 0;
}
}
+
+ @Override
+ public void init() {
+ singleSourceSource.init();
+ globalSourceSource.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ singleSourceSource.setGauge(gaugeName, value);
+ globalSourceSource.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ singleSourceSource.incGauge(gaugeName, delta);
+ globalSourceSource.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ singleSourceSource.decGauge(gaugeName, delta);
+ globalSourceSource.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ singleSourceSource.removeMetric(key);
+ globalSourceSource.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ singleSourceSource.incCounters(counterName, delta);
+ globalSourceSource.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ singleSourceSource.updateHistogram(name, value);
+ globalSourceSource.updateHistogram(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return globalSourceSource.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return globalSourceSource.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return globalSourceSource.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return globalSourceSource.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb02be38/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index a5a4e73..002b8c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.*;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -51,6 +52,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@@ -115,8 +120,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
- .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -255,6 +260,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
+ @Test
+ public void testMetricsSourceBaseSourcePassthrough(){
+ /*
+ The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+ and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+ Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+ allows for custom JMX metrics.
+ This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+ the two layers of wrapping to the actual BaseSource.
+ */
+ String id = "id";
+ DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+ MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+ when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+ MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+ when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+ MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+ MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+ String gaugeName = "gauge";
+ String singleGaugeName = "source.id." + gaugeName;
+ long delta = 1;
+ String counterName = "counter";
+ String singleCounterName = "source.id." + counterName;
+ long count = 2;
+ source.decGauge(gaugeName, delta);
+ source.getMetricsContext();
+ source.getMetricsDescription();
+ source.getMetricsJmxContext();
+ source.getMetricsName();
+ source.incCounters(counterName, count);
+ source.incGauge(gaugeName, delta);
+ source.init();
+ source.removeMetric(gaugeName);
+ source.setGauge(gaugeName, delta);
+ source.updateHistogram(counterName, count);
+
+ verify(singleRms).decGauge(singleGaugeName, delta);
+ verify(globalRms).decGauge(gaugeName, delta);
+ verify(globalRms).getMetricsContext();
+ verify(globalRms).getMetricsJmxContext();
+ verify(globalRms).getMetricsName();
+ verify(singleRms).incCounters(singleCounterName, count);
+ verify(globalRms).incCounters(counterName, count);
+ verify(singleRms).incGauge(singleGaugeName, delta);
+ verify(globalRms).incGauge(gaugeName, delta);
+ verify(globalRms).init();
+ verify(singleRms).removeMetric(singleGaugeName);
+ verify(globalRms).removeMetric(gaugeName);
+ verify(singleRms).setGauge(singleGaugeName, delta);
+ verify(globalRms).setGauge(gaugeName, delta);
+ verify(singleRms).updateHistogram(singleCounterName, count);
+ verify(globalRms).updateHistogram(counterName, count);
+ }
+
private void doPut(byte[] row) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
doPut(connection, row);
[2/3] hbase git commit: HBASE-16448 Custom metrics for custom
replication endpoints
Posted by ap...@apache.org.
HBASE-16448 Custom metrics for custom replication endpoints
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e9b49ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e9b49ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e9b49ca
Branch: refs/heads/branch-1
Commit: 6e9b49cac7b118732659eef0cebb804be3e16238
Parents: 1e15fa5
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Aug 23 14:42:07 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:17:42 2016 -0700
----------------------------------------------------------------------
.../MetricsReplicationSourceSource.java | 4 +-
.../MetricsReplicationGlobalSourceSource.java | 56 ++++++++++++++
.../MetricsReplicationSourceSourceImpl.java | 80 +++++++++++++++++---
.../replication/regionserver/MetricsSource.java | 78 ++++++++++++++++++-
.../replication/TestReplicationEndpoint.java | 67 +++++++++++++++-
5 files changed, 269 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..c877608 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..d595ca9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
+
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..5a6a103 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
+ private String keyPrefix;
+
@Deprecated
private final String shippedKBsKey;
private final String shippedBytesKey;
@@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
this.id = id;
+ this.keyPrefix = "source." + this.id + ".";
- ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+ ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
- sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+ sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
- shippedBatchesKey = "source." + this.id + ".shippedBatches";
+ shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
- shippedOpsKey = "source." + this.id + ".shippedOps";
+ shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
- shippedKBsKey = "source." + this.id + ".shippedKBs";
+ shippedKBsKey = this.keyPrefix + "shippedKBs";
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
- shippedBytesKey = "source." + this.id + ".shippedBytes";
+ shippedBytesKey = this.keyPrefix + "shippedBytes";
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
- logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+ logReadInBytesKey = this.keyPrefix + "logReadInBytes";
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
- logReadInEditsKey = "source." + id + ".logEditsRead";
+ logReadInEditsKey = this.keyPrefix + "logEditsRead";
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
- logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+ logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
- shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+ shippedHFilesKey = this.keyPrefix + "shippedHFiles";
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
- sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+ sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
}
@@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(this.keyPrefix + gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(this.keyPrefix + key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(this.keyPrefix + counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..7dfeff6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* through the metrics interfaces.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
@@ -61,6 +62,19 @@ public class MetricsSource {
}
/**
+ * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+ * @param id Name of the source this class is monitoring
+ * @param singleSourceSource Class to monitor id-scoped metrics
+ * @param globalSourceSource Class to monitor global-scoped metrics
+ */
+ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+ MetricsReplicationSourceSource globalSourceSource) {
+ this.id = id;
+ this.singleSourceSource = singleSourceSource;
+ this.globalSourceSource = globalSourceSource;
+ }
+
+ /**
* Set the age of the last edit that was shipped
* @param timestamp write time of the edit
* @param walGroup which group we are setting
@@ -227,4 +241,66 @@ public class MetricsSource {
lastHFileRefsQueueSize = 0;
}
}
+
+ @Override
+ public void init() {
+ singleSourceSource.init();
+ globalSourceSource.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ singleSourceSource.setGauge(gaugeName, value);
+ globalSourceSource.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ singleSourceSource.incGauge(gaugeName, delta);
+ globalSourceSource.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ singleSourceSource.decGauge(gaugeName, delta);
+ globalSourceSource.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ singleSourceSource.removeMetric(key);
+ globalSourceSource.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ singleSourceSource.incCounters(counterName, delta);
+ globalSourceSource.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ singleSourceSource.updateHistogram(name, value);
+ globalSourceSource.updateHistogram(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return globalSourceSource.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return globalSourceSource.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return globalSourceSource.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return globalSourceSource.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index d570549..5a54314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -38,11 +38,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -50,6 +51,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@@ -114,8 +119,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
- .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -254,6 +259,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
+ @Test
+ public void testMetricsSourceBaseSourcePassthrough(){
+ /*
+ The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+ and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+ Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+ allows for custom JMX metrics.
+ This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+ the two layers of wrapping to the actual BaseSource.
+ */
+ String id = "id";
+ DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+ MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+ when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+ MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+ when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+ MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+ MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+ String gaugeName = "gauge";
+ String singleGaugeName = "source.id." + gaugeName;
+ long delta = 1;
+ String counterName = "counter";
+ String singleCounterName = "source.id." + counterName;
+ long count = 2;
+ source.decGauge(gaugeName, delta);
+ source.getMetricsContext();
+ source.getMetricsDescription();
+ source.getMetricsJmxContext();
+ source.getMetricsName();
+ source.incCounters(counterName, count);
+ source.incGauge(gaugeName, delta);
+ source.init();
+ source.removeMetric(gaugeName);
+ source.setGauge(gaugeName, delta);
+ source.updateHistogram(counterName, count);
+
+ verify(singleRms).decGauge(singleGaugeName, delta);
+ verify(globalRms).decGauge(gaugeName, delta);
+ verify(globalRms).getMetricsContext();
+ verify(globalRms).getMetricsJmxContext();
+ verify(globalRms).getMetricsName();
+ verify(singleRms).incCounters(singleCounterName, count);
+ verify(globalRms).incCounters(counterName, count);
+ verify(singleRms).incGauge(singleGaugeName, delta);
+ verify(globalRms).incGauge(gaugeName, delta);
+ verify(globalRms).init();
+ verify(singleRms).removeMetric(singleGaugeName);
+ verify(globalRms).removeMetric(gaugeName);
+ verify(singleRms).setGauge(singleGaugeName, delta);
+ verify(globalRms).setGauge(gaugeName, delta);
+ verify(singleRms).updateHistogram(singleCounterName, count);
+ verify(globalRms).updateHistogram(counterName, count);
+ }
+
private void doPut(byte[] row) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
doPut(connection, row);
[3/3] hbase git commit: HBASE-16448 Custom metrics for custom
replication endpoints
Posted by ap...@apache.org.
HBASE-16448 Custom metrics for custom replication endpoints
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aac4e095
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aac4e095
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aac4e095
Branch: refs/heads/0.98
Commit: aac4e09514b8b7dcf9d253d5722d0fe813973b99
Parents: ce58f58
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Aug 23 16:49:24 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:18:00 2016 -0700
----------------------------------------------------------------------
.../MetricsReplicationSourceSource.java | 4 +-
.../MetricsReplicationGlobalSourceSource.java | 64 ++++++++++++++-
.../MetricsReplicationSourceSourceImpl.java | 81 ++++++++++++++++---
.../replication/regionserver/MetricsSource.java | 85 +++++++++++++++++++-
.../replication/TestReplicationEndpoint.java | 76 ++++++++++++++++-
5 files changed, 293 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 8611e15..ea0ae20 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index b3e1766..da1bcf4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource {
-
+ private final MetricsReplicationSource rms;
private final MutableGaugeLong ageOfLastShippedOpGauge;
private final MutableGaugeLong sizeOfLogQueueGauge;
private final MutableCounterLong logReadInEditsCounter;
@@ -35,7 +35,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableCounterLong logReadInBytesCounter;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
-
+ this.rms = rms;
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, 0L);
sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L);
@@ -118,4 +118,64 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(name, value);
+ }
+
+ @Override
+ public void updateQuantile(String name, long value) {
+ rms.updateQuantile(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index d6a9128..55c9b05 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -31,6 +31,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
+ private String keyPrefix;
+
@Deprecated
private final String shippedKBsKey;
private final String shippedBytesKey;
@@ -49,32 +51,33 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
this.id = id;
+ this.keyPrefix = "source." + this.id + ".";
- ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+ ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(ageOfLastShippedOpKey, 0L);
- sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+ sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfLogQueueKey, 0L);
- shippedBatchesKey = "source." + this.id + ".shippedBatches";
+ shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getLongCounter(shippedBatchesKey, 0L);
- shippedOpsKey = "source." + this.id + ".shippedOps";
+ shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getLongCounter(shippedOpsKey, 0L);
- shippedKBsKey = "source." + this.id + ".shippedKBs";
+ shippedKBsKey = this.keyPrefix + "shippedKBs";
shippedKBsCounter = rms.getMetricsRegistry().getLongCounter(shippedKBsKey, 0L);
- shippedBytesKey = "source." + this.id + ".shippedBytes";
+ shippedBytesKey = this.keyPrefix + "shippedBytes";
shippedBytesCounter = rms.getMetricsRegistry().getLongCounter(shippedBytesKey, 0L);
- logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+ logReadInBytesKey = this.keyPrefix + "logReadInBytes";
logReadInBytesCounter = rms.getMetricsRegistry().getLongCounter(logReadInBytesKey, 0L);
- logReadInEditsKey = "source." + id + ".logEditsRead";
+ logReadInEditsKey = this.keyPrefix + "logEditsRead";
logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(logReadInEditsKey, 0L);
- logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+ logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
}
@@ -139,4 +142,64 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public long getLastShippedAge() {
return ageOfLastShippedOpGauge.value();
}
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(this.keyPrefix + gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(this.keyPrefix + key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(this.keyPrefix + counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public void updateQuantile(String name, long value) {
+ rms.updateQuantile(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 113bc5b..5824d83 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* through the metrics interfaces.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
public static final Log LOG = LogFactory.getLog(MetricsSource.class);
@@ -74,6 +75,19 @@ public class MetricsSource {
}
/**
+ * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+ * @param id Name of the source this class is monitoring
+ * @param singleSourceSource Class to monitor id-scoped metrics
+ * @param globalSourceSource Class to monitor global-scoped metrics
+ */
+ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+ MetricsReplicationSourceSource globalSourceSource) {
+ this.id = id;
+ this.singleSourceSource = singleSourceSource;
+ this.globalSourceSource = globalSourceSource;
+ }
+
+ /**
* Set the age of the last edit that was shipped
*
* @param timestamp write time of the edit
@@ -196,4 +210,73 @@ public class MetricsSource {
public String getPeerID() {
return id;
}
+
+ @Override
+ public void init() {
+ singleSourceSource.init();
+ globalSourceSource.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ singleSourceSource.setGauge(gaugeName, value);
+ globalSourceSource.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ singleSourceSource.incGauge(gaugeName, delta);
+ globalSourceSource.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ singleSourceSource.decGauge(gaugeName, delta);
+ globalSourceSource.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ singleSourceSource.removeMetric(key);
+ globalSourceSource.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ singleSourceSource.incCounters(counterName, delta);
+ globalSourceSource.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ singleSourceSource.updateHistogram(name, value);
+ globalSourceSource.updateHistogram(name, value);
+ }
+
+ @Override
+ public void updateQuantile(String name, long value) {
+ singleSourceSource.updateQuantile(name, value);
+ globalSourceSource.updateQuantile(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return globalSourceSource.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return globalSourceSource.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return globalSourceSource.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return globalSourceSource.getMetricsName();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/aac4e095/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index ae4c8e3..7d7df5c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.replication.regionserver.*;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -44,6 +47,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@@ -82,8 +89,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
- .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -123,8 +130,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
admin.addPeer(id,
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
- .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// now replicate some data.
doPut(row);
@@ -141,6 +148,66 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
}
+ @Test
+ public void testMetricsSourceBaseSourcePassthrough(){
+ /*
+ The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+ and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+ Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+ allows for custom JMX metrics.
+ This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+ the two layers of wrapping to the actual BaseSource.
+ */
+ String id = "id";
+ DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+ MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+ when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+ MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+ when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+ MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+ MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+ String gaugeName = "gauge";
+ String singleGaugeName = "source.id." + gaugeName;
+ long delta = 1;
+ String counterName = "counter";
+ String singleCounterName = "source.id." + counterName;
+ long count = 2;
+ source.decGauge(gaugeName, delta);
+ source.getMetricsContext();
+ source.getMetricsDescription();
+ source.getMetricsJmxContext();
+ source.getMetricsName();
+ source.incCounters(counterName, count);
+ source.incGauge(gaugeName, delta);
+ source.init();
+ source.removeMetric(gaugeName);
+ source.setGauge(gaugeName, delta);
+ source.updateHistogram(counterName, count);
+ source.updateQuantile(counterName, count);
+
+ verify(singleRms).decGauge(singleGaugeName, delta);
+ verify(globalRms).decGauge(gaugeName, delta);
+ verify(globalRms).getMetricsContext();
+ verify(globalRms).getMetricsJmxContext();
+ verify(globalRms).getMetricsName();
+ verify(singleRms).incCounters(singleCounterName, count);
+ verify(globalRms).incCounters(counterName, count);
+ verify(singleRms).incGauge(singleGaugeName, delta);
+ verify(globalRms).incGauge(gaugeName, delta);
+ verify(globalRms).init();
+ verify(singleRms).removeMetric(singleGaugeName);
+ verify(globalRms).removeMetric(gaugeName);
+ verify(singleRms).setGauge(singleGaugeName, delta);
+ verify(globalRms).setGauge(gaugeName, delta);
+ verify(singleRms).updateHistogram(singleCounterName, count);
+ verify(globalRms).updateHistogram(counterName, count);
+ verify(singleRms).updateQuantile(singleCounterName, count);
+ verify(globalRms).updateQuantile(counterName, count);
+
+ }
+
@Test (timeout=120000)
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
@@ -253,6 +320,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
return true;
}
+
@Override
public WALEntryFilter getWALEntryfilter() {
return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {