You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/12/14 00:15:05 UTC
[37/50] [abbrv] hbase git commit: HBASE-20858 Port HBASE-20695
(Implement table level RegionServer replication metrics) to branch-1
HBASE-20858 Port HBASE-20695 (Implement table level RegionServer replication metrics) to branch-1
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9e39a200
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9e39a200
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9e39a200
Branch: refs/heads/branch-1.3
Commit: 9e39a2009648ba21d55676ea90b328941bb9b7db
Parents: 614b5f6
Author: Xu Cang <xc...@salesforce.com>
Authored: Fri Jul 6 16:36:05 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Dec 12 18:08:20 2018 -0800
----------------------------------------------------------------------
.../replication/regionserver/MetricsSource.java | 43 +++++++++++++---
.../regionserver/ReplicationSource.java | 5 ++
.../replication/TestReplicationEndpoint.java | 52 ++++++++++++++++++++
3 files changed, 94 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 9b99f2a..56baa05 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -35,8 +33,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource {
- private static final Log LOG = LogFactory.getLog(MetricsSource.class);
-
// tracks last shipped timestamp for each wal group
private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private long lastHFileRefsQueueSize = 0;
@@ -44,7 +40,7 @@ public class MetricsSource {
private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource;
-
+ private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
/**
* Constructor used to register the metrics
@@ -56,7 +52,24 @@ public class MetricsSource {
singleSourceSource =
CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
.getSource(id);
- globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+ globalSourceSource = CompatibilitySingletonFactory
+ .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+ singleSourceSourceByTable = new HashMap<>();
+ }
+
+ /**
+ * Constructor for injecting custom (or test) MetricsReplicationSourceSources
+ * @param id Name of the source this class is monitoring
+ * @param singleSourceSource Class to monitor id-scoped metrics
+ * @param globalSourceSource Class to monitor global-scoped metrics
+ */
+ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
+ MetricsReplicationSourceSource globalSourceSource,
+ Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
+ this.id = id;
+ this.singleSourceSource = singleSourceSource;
+ this.globalSourceSource = globalSourceSource;
+ this.singleSourceSourceByTable = singleSourceSourceByTable;
}
/**
@@ -72,6 +85,20 @@ public class MetricsSource {
}
/**
+ * Set the age of the last edit that was shipped group by table
+ * @param timestamp write time of the edit
+ * @param tableName String as group and tableName
+ */
+ public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
+ long age = EnvironmentEdgeManager.currentTime() - timestamp;
+ if (!this.getSingleSourceSourceByTable().containsKey(tableName)) {
+ this.getSingleSourceSourceByTable().put(tableName,
+ CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
+ .getSource(tableName));
+ }
+ this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age);
+ }
+ /**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.
* @param walGroupId id of the group to update
@@ -262,4 +289,8 @@ public class MetricsSource {
singleSourceSource.incrCompletedRecoveryQueue();
globalSourceSource.incrCompletedRecoveryQueue();
}
+
+ public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
+ return singleSourceSourceByTable;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 78b465c..8112553 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1066,6 +1066,11 @@ public class ReplicationSource extends Thread
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
+
+ TableName tableName = entries.get(i).getKey().getTablename();
+ source.getSourceMetrics().setAgeOfLastShippedOpByTable(
+ entries.get(i).getKey().getWriteTime(),
+ tableName.getNameAsString());
}
//Log and clean up WAL logs
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e39a200/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index d570549..c3822c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -18,9 +18,15 @@
package org.apache.hadoop.hbase.replication;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,10 +45,16 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -253,6 +265,46 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
}
+ @Test
+ public void testMetricsSourceBaseSourcePassthrough(){
+ /*
+ The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
+ and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
+ Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
+ allows for custom JMX metrics.
+ This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
+ the two layers of wrapping to the actual BaseSource.
+ */
+ String id = "id";
+ DynamicMetricsRegistry mockRegistry = new DynamicMetricsRegistry(id);
+ MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
+ when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
+ MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
+ when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
+
+ MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
+ MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
+ MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
+ singleSourceSourceByTable);
+
+ // check singleSourceSourceByTable metrics.
+ // singleSourceSourceByTable map entry will be created only
+ // after calling #setAgeOfLastShippedOpByTable
+ boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
+ .containsKey("RandomNewTable");
+ Assert.assertEquals(false, containsRandomNewTable);
+ source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
+ containsRandomNewTable = source.getSingleSourceSourceByTable()
+ .containsKey("RandomNewTable");
+ Assert.assertEquals(true, containsRandomNewTable);
+ MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
+ .get("RandomNewTable");
+ // cannot put more concreate value here to verify because the age is arbitrary.
+ // as long as it's greater than 0, we see it as correct answer.
+ Assert.assertTrue(msr.getLastShippedAge() > 0);
+ }
private void doPut(byte[] row) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(conf1)) {