You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/08/24 01:02:08 UTC
[2/3] hbase git commit: HBASE-16448 Custom metrics for custom
replication endpoints
HBASE-16448 Custom metrics for custom replication endpoints
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e9b49ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e9b49ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e9b49ca
Branch: refs/heads/branch-1
Commit: 6e9b49cac7b118732659eef0cebb804be3e16238
Parents: 1e15fa5
Author: Geoffrey <gj...@salesforce.com>
Authored: Tue Aug 23 14:42:07 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Aug 23 17:17:42 2016 -0700
----------------------------------------------------------------------
.../MetricsReplicationSourceSource.java | 4 +-
.../MetricsReplicationGlobalSourceSource.java | 56 ++++++++++++++
.../MetricsReplicationSourceSourceImpl.java | 80 +++++++++++++++++---
.../replication/regionserver/MetricsSource.java | 78 ++++++++++++++++++-
.../replication/TestReplicationEndpoint.java | 67 +++++++++++++++-
5 files changed, 269 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 271f0ac..c877608 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
-public interface MetricsReplicationSourceSource {
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 476d2f7..d595ca9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -143,4 +143,60 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
+
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 835e81c..5a6a103 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,6 +30,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
+ private String keyPrefix;
+
@Deprecated
private final String shippedKBsKey;
private final String shippedBytesKey;
@@ -52,38 +54,39 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
this.id = id;
+ this.keyPrefix = "source." + this.id + ".";
- ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
+ ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
ageOfLastShippedOpGauge = rms.getMetricsRegistry().getGauge(ageOfLastShippedOpKey, 0L);
- sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue";
+ sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue";
sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfLogQueueKey, 0L);
- shippedBatchesKey = "source." + this.id + ".shippedBatches";
+ shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
- shippedOpsKey = "source." + this.id + ".shippedOps";
+ shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
- shippedKBsKey = "source." + this.id + ".shippedKBs";
+ shippedKBsKey = this.keyPrefix + "shippedKBs";
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
- shippedBytesKey = "source." + this.id + ".shippedBytes";
+ shippedBytesKey = this.keyPrefix + "shippedBytes";
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
- logReadInBytesKey = "source." + this.id + ".logReadInBytes";
+ logReadInBytesKey = this.keyPrefix + "logReadInBytes";
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
- logReadInEditsKey = "source." + id + ".logEditsRead";
+ logReadInEditsKey = this.keyPrefix + "logEditsRead";
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(logReadInEditsKey, 0L);
- logEditsFilteredKey = "source." + id + ".logEditsFiltered";
+ logEditsFilteredKey = this.keyPrefix + "logEditsFiltered";
logEditsFilteredCounter = rms.getMetricsRegistry().getCounter(logEditsFilteredKey, 0L);
- shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+ shippedHFilesKey = this.keyPrefix + "shippedHFiles";
shippedHFilesCounter = rms.getMetricsRegistry().getCounter(shippedHFilesKey, 0L);
- sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+ sizeOfHFileRefsQueueKey = this.keyPrefix + "sizeOfHFileRefsQueue";
sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getGauge(sizeOfHFileRefsQueueKey, 0L);
}
@@ -168,4 +171,59 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
public int getSizeOfLogQueue() {
return (int)sizeOfLogQueueGauge.value();
}
+
+ @Override
+ public void init() {
+ rms.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ rms.setGauge(this.keyPrefix + gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ rms.incGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ rms.decGauge(this.keyPrefix + gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ rms.removeMetric(this.keyPrefix + key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ rms.incCounters(this.keyPrefix + counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ rms.updateHistogram(this.keyPrefix + name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return rms.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return rms.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return rms.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return rms.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index b07f1d1..7dfeff6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* through the metrics interfaces.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public class MetricsSource {
+public class MetricsSource implements BaseSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
@@ -61,6 +62,19 @@ public class MetricsSource {
}
/**
+ * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+ * @param id Name of the source this class is monitoring
+ * @param singleSourceSource Class to monitor id-scoped metrics
+ * @param globalSourceSource Class to monitor global-scoped metrics
+ */
+ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+ MetricsReplicationSourceSource globalSourceSource) {
+ this.id = id;
+ this.singleSourceSource = singleSourceSource;
+ this.globalSourceSource = globalSourceSource;
+ }
+
+ /**
* Set the age of the last edit that was shipped
* @param timestamp write time of the edit
* @param walGroup which group we are setting
@@ -227,4 +241,66 @@ public class MetricsSource {
lastHFileRefsQueueSize = 0;
}
}
+
+ @Override
+ public void init() {
+ singleSourceSource.init();
+ globalSourceSource.init();
+ }
+
+ @Override
+ public void setGauge(String gaugeName, long value) {
+ singleSourceSource.setGauge(gaugeName, value);
+ globalSourceSource.setGauge(gaugeName, value);
+ }
+
+ @Override
+ public void incGauge(String gaugeName, long delta) {
+ singleSourceSource.incGauge(gaugeName, delta);
+ globalSourceSource.incGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void decGauge(String gaugeName, long delta) {
+ singleSourceSource.decGauge(gaugeName, delta);
+ globalSourceSource.decGauge(gaugeName, delta);
+ }
+
+ @Override
+ public void removeMetric(String key) {
+ singleSourceSource.removeMetric(key);
+ globalSourceSource.removeMetric(key);
+ }
+
+ @Override
+ public void incCounters(String counterName, long delta) {
+ singleSourceSource.incCounters(counterName, delta);
+ globalSourceSource.incCounters(counterName, delta);
+ }
+
+ @Override
+ public void updateHistogram(String name, long value) {
+ singleSourceSource.updateHistogram(name, value);
+ globalSourceSource.updateHistogram(name, value);
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return globalSourceSource.getMetricsContext();
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return globalSourceSource.getMetricsDescription();
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return globalSourceSource.getMetricsJmxContext();
+ }
+
+ @Override
+ public String getMetricsName() {
+ return globalSourceSource.getMetricsName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9b49ca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index d570549..5a54314 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -38,11 +38,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -50,6 +51,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@@ -114,8 +119,8 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
- .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@@ -254,6 +259,62 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
+ @Test
+ public void testMetricsSourceBaseSourcePassthrough(){
+ /*
+ The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+ and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+ Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+ allows for custom JMX metrics.
+ This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+ the two layers of wrapping to the actual BaseSource.
+ */
+ String id = "id";
+ DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
+ MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+ when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+ MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+ when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+ MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+ MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource);
+ String gaugeName = "gauge";
+ String singleGaugeName = "source.id." + gaugeName;
+ long delta = 1;
+ String counterName = "counter";
+ String singleCounterName = "source.id." + counterName;
+ long count = 2;
+ source.decGauge(gaugeName, delta);
+ source.getMetricsContext();
+ source.getMetricsDescription();
+ source.getMetricsJmxContext();
+ source.getMetricsName();
+ source.incCounters(counterName, count);
+ source.incGauge(gaugeName, delta);
+ source.init();
+ source.removeMetric(gaugeName);
+ source.setGauge(gaugeName, delta);
+ source.updateHistogram(counterName, count);
+
+ verify(singleRms).decGauge(singleGaugeName, delta);
+ verify(globalRms).decGauge(gaugeName, delta);
+ verify(globalRms).getMetricsContext();
+ verify(globalRms).getMetricsJmxContext();
+ verify(globalRms).getMetricsName();
+ verify(singleRms).incCounters(singleCounterName, count);
+ verify(globalRms).incCounters(counterName, count);
+ verify(singleRms).incGauge(singleGaugeName, delta);
+ verify(globalRms).incGauge(gaugeName, delta);
+ verify(globalRms).init();
+ verify(singleRms).removeMetric(singleGaugeName);
+ verify(globalRms).removeMetric(gaugeName);
+ verify(singleRms).setGauge(singleGaugeName, delta);
+ verify(globalRms).setGauge(gaugeName, delta);
+ verify(singleRms).updateHistogram(singleCounterName, count);
+ verify(globalRms).updateHistogram(counterName, count);
+ }
+
private void doPut(byte[] row) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
doPut(connection, row);