You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2021/03/23 05:55:35 UTC
[hbase] branch branch-1 updated: HBASE-25627: HBase replication
should have a metric to represent if the source is stuck getting
initialized (#3009)
This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 97c152e HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009)
97c152e is described below
commit 97c152ea7a664650423cc60250bc9bbd06a8ff00
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Mon Mar 22 22:55:04 2021 -0700
HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009)
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../MetricsReplicationSourceSource.java | 7 +++--
.../MetricsReplicationGlobalSourceSource.java | 18 ++++++++++++
.../MetricsReplicationSourceSourceImpl.java | 21 +++++++++++++
.../metrics2/lib/DynamicMetricsRegistry.java | 34 ++++++++++++++++++++++
.../replication/regionserver/MetricsSource.java | 30 +++++++++++++++++--
.../regionserver/ReplicationSource.java | 19 ++++++++++--
.../hbase/replication/TestReplicationEndpoint.java | 16 ++++++++++
.../hbase/replication/TestReplicationSource.java | 25 +++++++++++++++-
8 files changed, 162 insertions(+), 8 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index a7cea25..b997338 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -50,8 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
- /* Used to track the age of oldest wal in ms since its creation time */
- String OLDEST_WAL_AGE = "source.oldestWalAge";
+ // This is to track the num of replication sources getting initialized
+ public static final String SOURCE_INITIALIZING = "source.numInitializing";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -78,4 +78,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void incrFailedRecoveryQueue();
void setOldestWalAge(long age);
long getOldestWalAge();
+ void incrSourceInitializing();
+ void decrSourceInitializing();
+ int getSourceInitializing();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index df774d3..ccdf1be 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
@@ -46,6 +47,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
+ private final MutableGaugeInt sourceInitializing;
public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
@@ -82,6 +84,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+ sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
}
@Override public void setLastShippedAge(long age) {
@@ -209,6 +212,21 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
}
@Override
+ public void incrSourceInitializing() {
+ sourceInitializing.incr(1);
+ }
+
+ @Override
+ public void decrSourceInitializing() {
+ sourceInitializing.decr(1);
+ }
+
+ @Override
+ public int getSourceInitializing() {
+ return sourceInitializing.value();
+ }
+
+ @Override
public void init() {
rms.init();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index c593950..5662ee9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
@@ -41,6 +42,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
private final String oldestWalAgeKey;
+ private final String sourceInitializingKey;
private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -69,6 +71,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableGaugeLong oldestWalAge;
+ private final MutableGaugeInt sourceInitializing;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
@@ -131,6 +134,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
+
+ sourceInitializingKey = this.keyPrefix + "isInitializing";
+ sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
}
@Override public void setLastShippedAge(long age) {
@@ -197,6 +203,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
rms.removeMetric(oldestWalAgeKey);
+ rms.removeMetric(sourceInitializingKey);
}
@Override
@@ -271,6 +278,20 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
}
@Override
+ public void incrSourceInitializing() {
+ sourceInitializing.incr(1);
+ }
+
+ @Override
+ public int getSourceInitializing() {
+ return sourceInitializing.value();
+ }
+
+ @Override public void decrSourceInitializing() {
+ sourceInitializing.decr(1);
+ }
+
+ @Override
public void init() {
rms.init();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
index 977536a..3840fe4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
@@ -450,6 +450,40 @@ public class DynamicMetricsRegistry {
}
/**
+ * Get a MetricMutableGaugeInt from the storage. If it is not there atomically put it.
+ *
+ * @param gaugeName name of the gauge to create or get.
+ * @param potentialStartingValue value of the new gauge if we have to create it.
+ */
+ public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) {
+ //Try and get the guage.
+ MutableMetric metric = metricsMap.get(gaugeName);
+
+ //If it's not there then try and put a new one in the storage.
+ if (metric == null) {
+ //Create the potential new gauge.
+ MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""),
+ potentialStartingValue);
+
+ // Try and put the gauge in. This is atomic.
+ metric = metricsMap.putIfAbsent(gaugeName, newGauge);
+
+ //If the value we get back is null then the put was successful and we will return that.
+ //otherwise gaugeInt should contain the thing that was in before the put could be completed.
+ if (metric == null) {
+ return newGauge;
+ }
+ }
+
+ if (!(metric instanceof MutableGaugeInt)) {
+ throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName +
+ " and not of type MetricMutableGaugeInr");
+ }
+
+ return (MutableGaugeInt) metric;
+ }
+
+ /**
* Get a MetricMutableCounterLong from the storage. If it is not there atomically put it.
*
* @param counterName Name of the counter to get
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 83bc653..308a746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -22,10 +22,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -134,7 +133,8 @@ public class MetricsSource implements BaseSource {
* @param tableName String as group and tableName
*/
public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
- getSourceForTable(tableName).setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp);
+ getSourceForTable(tableName)
+ .setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp);
}
/**
@@ -185,6 +185,22 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Increment the count for initializing sources
+ */
+ public void incrSourceInitializing() {
+ singleSourceSource.incrSourceInitializing();
+ globalSourceSource.incrSourceInitializing();
+ }
+
+ /**
+ * Decrement the count for initializing sources
+ */
+ public void decrSourceInitializing() {
+ singleSourceSource.decrSourceInitializing();
+ globalSourceSource.decrSourceInitializing();
+ }
+
+ /**
* Add on the the number of log edits read
*
* @param delta the number of log edits read.
@@ -291,6 +307,14 @@ public class MetricsSource implements BaseSource {
}
/**
+ * Get the source initializing counts
+ * @return number of replication sources getting initialized
+ */
+ public int getSourceInitializing() {
+ return singleSourceSource.getSourceInitializing();
+ }
+
+ /**
* Get the slave peer ID
* @return peerID
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index de3b7f6..a121e65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -119,6 +117,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private int maxRetriesMultiplier;
// Indicates if this particular source is running
private volatile boolean sourceRunning = false;
+ // Indicates if the source initialization is in progress
+ private volatile boolean startupOngoing = false;
// Metrics for this source
private MetricsSource metrics;
// ReplicationEndpoint which will handle the actual replication
@@ -266,16 +266,19 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void run() {
// mark we are running now
this.sourceRunning = true;
+ this.setSourceStartupStatus(true);
try {
// start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get();
if (state != Service.State.RUNNING) {
LOG.warn("ReplicationEndpoint was not started. Exiting");
uninitialize();
+ this.setSourceStartupStatus(false);
return;
}
} catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+ this.setSourceStartupStatus(false);
throw new RuntimeException(ex);
}
@@ -300,6 +303,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
if (!this.isSourceActive()) {
+ this.setSourceStartupStatus(false);
return;
}
@@ -310,6 +314,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this);
+ this.setSourceStartupStatus(false);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@@ -327,6 +332,16 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
worker.startup();
}
}
+ this.setSourceStartupStatus(false);
+ }
+
+ private synchronized void setSourceStartupStatus(boolean initializing) {
+ startupOngoing = initializing;
+ if (initializing) {
+ metrics.incrSourceInitializing();
+ } else {
+ metrics.decrSourceInitializing();
+ }
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 3819877..9787120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -419,6 +419,22 @@ public class TestReplicationEndpoint extends TestReplicationBase {
cells.get(0).getRowLength(), row, 0, row.length));
}
+ /**
+ * Bad Endpoint with failing connection to peer on demand.
+ */
+ public static class BadReplicationEndpoint extends ReplicationEndpointForTest {
+ static boolean failing = true;
+
+ public BadReplicationEndpoint() {
+ super();
+ }
+
+ @Override
+ public synchronized UUID getPeerUUID() {
+ return failing ? null : super.getPeerUUID();
+ }
+ }
+
public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
static UUID uuid = UUID.randomUUID();
static AtomicInteger contructedCount = new AtomicInteger();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index e7ff58f..ca09aa8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -863,7 +863,7 @@ public class TestReplicationSource {
}
}
- /*
+ /**
Test age of oldest wal metric.
*/
@Test
@@ -898,6 +898,29 @@ public class TestReplicationSource {
}
}
+ @Test
+ public void testReplicationSourceInitializingMetric() throws Exception {
+ String id = "1";
+ MetricsSource metrics = Mockito.spy(new MetricsSource(id));
+ Mocks mocks = new Mocks();
+ ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics,
+ new TestReplicationEndpoint.BadReplicationEndpoint());
+ source.startup();
+ final MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return metricsSource1.getSourceInitializing() == 1;
+ }
+ });
+ TestReplicationEndpoint.BadReplicationEndpoint.failing = false;
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return metricsSource1.getSourceInitializing() == 0;
+ }
+ });
+ metrics.clear();
+ }
+
private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class);