You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2021/03/17 17:44:33 UTC

[hbase] branch branch-2.4 updated: HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3018)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new c485a6b  HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3018)
c485a6b is described below

commit c485a6bb799be2075a48d057b3de2a89c7897e36
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Wed Mar 17 09:10:44 2021 -0700

    HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3018)
    
    Introduces a new metric that tracks number of replication sources that are stuck in initialization.
    
    Signed-off-by: Xu Cang <xu...@apache.org>
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
    (cherry picked from commit ff3821814aa79248ec82f1c07f9d624e7dfb2334)
---
 .../MetricsReplicationSourceSource.java            |  7 +++--
 .../MetricsReplicationGlobalSourceSourceImpl.java  | 20 +++++++++++-
 .../MetricsReplicationSourceSourceImpl.java        | 21 +++++++++++++
 .../metrics2/lib/DynamicMetricsRegistry.java       | 36 ++++++++++++++++++++--
 .../replication/regionserver/MetricsSource.java    | 32 ++++++++++++++++---
 .../regionserver/ReplicationSource.java            | 23 +++++++++-----
 .../regionserver/TestReplicationSource.java        | 33 +++++++++++++++++++-
 7 files changed, 155 insertions(+), 17 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index b5eb0aa..1360afe 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -52,8 +52,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
   public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
   public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
-  /* Used to track the age of oldest wal in ms since its creation time */
-  String OLDEST_WAL_AGE = "source.oldestWalAge";
+  // This is to track the num of replication sources getting initialized
+  public static final String SOURCE_INITIALIZING = "source.numInitializing";
 
   void setLastShippedAge(long age);
   void incrSizeOfLogQueue(int size);
@@ -83,4 +83,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   long getEditsFiltered();
   void setOldestWalAge(long age);
   long getOldestWalAge();
+  void incrSourceInitializing();
+  void decrSourceInitializing();
+  int getSourceInitializing();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 9e69f18..6f31b4e 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -19,13 +19,14 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 public class MetricsReplicationGlobalSourceSourceImpl
-    implements MetricsReplicationGlobalSourceSource {
+  implements MetricsReplicationGlobalSourceSource {
   private static final String KEY_PREFIX = "source.";
 
   private final MetricsReplicationSourceImpl rms;
@@ -55,6 +56,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableFastCounter failedRecoveryQueue;
   private final MutableGaugeLong walReaderBufferUsageBytes;
+  private final MutableGaugeInt sourceInitializing;
 
   public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
@@ -97,6 +99,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
 
     walReaderBufferUsageBytes = rms.getMetricsRegistry()
         .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
+    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -223,6 +226,21 @@ public class MetricsReplicationGlobalSourceSourceImpl
   }
 
   @Override
+  public void incrSourceInitializing() {
+    sourceInitializing.incr(1);
+  }
+
+  @Override
+  public void decrSourceInitializing() {
+    sourceInitializing.decr(1);
+  }
+
+  @Override
+  public int getSourceInitializing() {
+    return sourceInitializing.value();
+  }
+
+  @Override
   public void init() {
     rms.init();
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index f9a907f..68c1f17 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -45,6 +46,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String shippedHFilesKey;
   private final String sizeOfHFileRefsQueueKey;
   private final String oldestWalAgeKey;
+  private final String sourceInitializingKey;
 
   private final MutableHistogram ageOfLastShippedOpHist;
   private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -73,6 +75,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableGaugeLong oldestWalAge;
+  private final MutableGaugeInt sourceInitializing;
 
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
@@ -135,6 +138,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
     oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
+
+    sourceInitializingKey = this.keyPrefix + "isInitializing";
+    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -201,6 +207,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(completedLogsKey);
     rms.removeMetric(completedRecoveryKey);
     rms.removeMetric(oldestWalAgeKey);
+    rms.removeMetric(sourceInitializingKey);
   }
 
   @Override
@@ -275,6 +282,20 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   }
 
   @Override
+  public void incrSourceInitializing() {
+    sourceInitializing.incr(1);
+  }
+
+  @Override
+  public int getSourceInitializing() {
+    return sourceInitializing.value();
+  }
+
+  @Override public void decrSourceInitializing() {
+    sourceInitializing.decr(1);
+  }
+
+  @Override
   public void init() {
     rms.init();
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
index 7e17ee9..7a791c9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.metrics2.lib;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
-
 import org.apache.hadoop.hbase.metrics.Interns;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.MetricsInfo;
@@ -30,7 +29,6 @@ import org.apache.hadoop.metrics2.impl.MsInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 
@@ -453,6 +451,40 @@ public class DynamicMetricsRegistry {
   }
 
   /**
+   * Get a MetricMutableGaugeInt from the storage.  If it is not there atomically put it.
+   *
+   * @param gaugeName              name of the gauge to create or get.
+   * @param potentialStartingValue value of the new gauge if we have to create it.
+   */
+  public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) {
+    //Try and get the guage.
+    MutableMetric metric = metricsMap.get(gaugeName);
+
+    //If it's not there then try and put a new one in the storage.
+    if (metric == null) {
+      //Create the potential new gauge.
+      MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""),
+        potentialStartingValue);
+
+      // Try and put the gauge in.  This is atomic.
+      metric = metricsMap.putIfAbsent(gaugeName, newGauge);
+
+      //If the value we get back is null then the put was successful and we will return that.
+      //otherwise gaugeInt should contain the thing that was in before the put could be completed.
+      if (metric == null) {
+        return newGauge;
+      }
+    }
+
+    if (!(metric instanceof MutableGaugeInt)) {
+      throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName +
+        " and not of type MetricMutableGaugeInr");
+    }
+
+    return (MutableGaugeInt) metric;
+  }
+
+  /**
    * Get a MetricMutableCounterLong from the storage.  If it is not there atomically put it.
    *
    * @param counterName            Name of the counter to get
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index f11b7de..036be91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -21,13 +21,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +61,8 @@ public class MetricsSource implements BaseSource {
     singleSourceSource =
         CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
             .getSource(id);
-    globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
+    globalSourceSource = CompatibilitySingletonFactory
+      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
     singleSourceSourceByTable = new HashMap<>();
   }
 
@@ -169,6 +169,22 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Increment the count for initializing sources
+   */
+  public void incrSourceInitializing() {
+    singleSourceSource.incrSourceInitializing();
+    globalSourceSource.incrSourceInitializing();
+  }
+
+  /**
+   * Decrement the count for initializing sources
+   */
+  public void decrSourceInitializing() {
+    singleSourceSource.decrSourceInitializing();
+    globalSourceSource.decrSourceInitializing();
+  }
+
+  /**
    * Add on the the number of log edits read
    *
    * @param delta the number of log edits read.
@@ -336,6 +352,14 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Get the source initializing counts
+   * @return number of replication sources getting initialized
+   */
+  public int getSourceInitializing() {
+    return singleSourceSource.getSourceInitializing();
+  }
+
+  /**
    * Get the slave peer ID
    * @return peerID
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8c7f0a6..2317663 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -548,7 +548,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
           sleepMultiplier++;
         } else {
           retryStartup.set(!this.abortOnError);
-          this.startupOngoing.set(false);
+          setSourceStartupStatus(false);
           throw new RuntimeException("Exhausted retries to start replication endpoint.");
         }
       }
@@ -556,7 +556,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
     if (!this.isSourceActive()) {
       retryStartup.set(!this.abortOnError);
-      this.startupOngoing.set(false);
+      setSourceStartupStatus(false);
       throw new IllegalStateException("Source should be active.");
     }
 
@@ -580,7 +580,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
     if(!this.isSourceActive()) {
       retryStartup.set(!this.abortOnError);
-      this.startupOngoing.set(false);
+      setSourceStartupStatus(false);
       throw new IllegalStateException("Source should be active.");
     }
     LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
@@ -591,7 +591,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
     for (String walGroupId: logQueue.getQueues().keySet()) {
       tryStartNewShipper(walGroupId);
     }
-    this.startupOngoing.set(false);
+    setSourceStartupStatus(false);
+  }
+
+  private synchronized void setSourceStartupStatus(boolean initializing) {
+    startupOngoing.set(initializing);
+    if (initializing) {
+      metrics.incrSourceInitializing();
+    } else {
+      metrics.decrSourceInitializing();
+    }
   }
 
   @Override
@@ -600,7 +609,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       return this;
     }
     this.sourceRunning = true;
-    startupOngoing.set(true);
+    setSourceStartupStatus(true);
     initThread = new Thread(this::initialize);
     Threads.setDaemonThreadRunning(initThread,
       Thread.currentThread().getName() + ".replicationSource," + this.queueId,
@@ -614,12 +623,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
         do {
           if(retryStartup.get()) {
             this.sourceRunning = true;
-            startupOngoing.set(true);
+            setSourceStartupStatus(true);
             retryStartup.set(false);
             try {
               initialize();
             } catch(Throwable error){
-              sourceRunning = false;
+              setSourceStartupStatus(false);
               uncaughtException(t, error, null, null);
               retryStartup.set(!this.abortOnError);
             }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index bb2b1da..484bd5b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -457,7 +457,7 @@ public class TestReplicationSource {
 
     @Override
     public synchronized UUID getPeerUUID() {
-      if(count==0) {
+      if (count==0) {
         count++;
         throw new RuntimeException();
       } else {
@@ -467,6 +467,18 @@ public class TestReplicationSource {
 
   }
 
+  /**
+   * Bad Endpoint with failing connection to peer on demand.
+   */
+  public static class BadReplicationEndpoint extends DoNothingReplicationEndpoint {
+    static boolean failing = true;
+
+    @Override
+    public synchronized UUID getPeerUUID() {
+      return failing ? null : super.getPeerUUID();
+    }
+  }
+
   public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {
 
     static int count = 0;
@@ -556,6 +568,25 @@ public class TestReplicationSource {
     }
   }
 
+  @Test
+  public void testReplicationSourceInitializingMetric() throws IOException {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean("replication.source.regionserver.abort", false);
+    ReplicationSource rs = new ReplicationSource();
+    RegionServerServices rss = setupForAbortTests(rs, conf,
+      BadReplicationEndpoint.class.getName());
+    try {
+      rs.startup();
+      assertTrue(rs.isSourceActive());
+      Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 1);
+      BadReplicationEndpoint.failing = false;
+      Waiter.waitFor(conf, 1000, () -> rs.getSourceMetrics().getSourceInitializing() == 0);
+    } finally {
+      rs.terminate("Done");
+      rss.stop("Done");
+    }
+  }
+
   /**
    * Test ReplicationSource keeps retrying startup indefinitely without blocking the main thread,
    * when <b>eplication.source.regionserver.abort</b> is set to false.