You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2021/03/23 05:55:35 UTC

[hbase] branch branch-1 updated: HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 97c152e  HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009)
97c152e is described below

commit 97c152ea7a664650423cc60250bc9bbd06a8ff00
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Mon Mar 22 22:55:04 2021 -0700

    HBASE-25627: HBase replication should have a metric to represent if the source is stuck getting initialized (#3009)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../MetricsReplicationSourceSource.java            |  7 +++--
 .../MetricsReplicationGlobalSourceSource.java      | 18 ++++++++++++
 .../MetricsReplicationSourceSourceImpl.java        | 21 +++++++++++++
 .../metrics2/lib/DynamicMetricsRegistry.java       | 34 ++++++++++++++++++++++
 .../replication/regionserver/MetricsSource.java    | 30 +++++++++++++++++--
 .../regionserver/ReplicationSource.java            | 19 ++++++++++--
 .../hbase/replication/TestReplicationEndpoint.java | 16 ++++++++++
 .../hbase/replication/TestReplicationSource.java   | 25 +++++++++++++++-
 8 files changed, 162 insertions(+), 8 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index a7cea25..b997338 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -50,8 +50,8 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
   public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
   public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
-  /* Used to track the age of oldest wal in ms since its creation time */
-  String OLDEST_WAL_AGE = "source.oldestWalAge";
+  // This is to track the num of replication sources getting initialized
+  public static final String SOURCE_INITIALIZING = "source.numInitializing";
 
   void setLastShippedAge(long age);
   void incrSizeOfLogQueue(int size);
@@ -78,4 +78,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   void incrFailedRecoveryQueue();
   void setOldestWalAge(long age);
   long getOldestWalAge();
+  void incrSourceInitializing();
+  void decrSourceInitializing();
+  int getSourceInitializing();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index df774d3..ccdf1be 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
 
@@ -46,6 +47,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableFastCounter failedRecoveryQueue;
+  private final MutableGaugeInt sourceInitializing;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
@@ -82,6 +84,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
     completedRecoveryQueue = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
     failedRecoveryQueue = rms.getMetricsRegistry()
         .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
+    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(SOURCE_INITIALIZING, 0);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -209,6 +212,21 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   }
 
   @Override
+  public void incrSourceInitializing() {
+    sourceInitializing.incr(1);
+  }
+
+  @Override
+  public void decrSourceInitializing() {
+    sourceInitializing.decr(1);
+  }
+
+  @Override
+  public int getSourceInitializing() {
+    return sourceInitializing.value();
+  }
+
+  @Override
   public void init() {
     rms.init();
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index c593950..5662ee9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
 
@@ -41,6 +42,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String shippedHFilesKey;
   private final String sizeOfHFileRefsQueueKey;
   private final String oldestWalAgeKey;
+  private final String sourceInitializingKey;
 
   private final MutableHistogram ageOfLastShippedOpHist;
   private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -69,6 +71,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableFastCounter completedWAL;
   private final MutableFastCounter completedRecoveryQueue;
   private final MutableGaugeLong oldestWalAge;
+  private final MutableGaugeInt sourceInitializing;
 
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
@@ -131,6 +134,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
     oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
+
+    sourceInitializingKey = this.keyPrefix + "isInitializing";
+    sourceInitializing = rms.getMetricsRegistry().getGaugeInt(sourceInitializingKey, 0);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -197,6 +203,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(completedLogsKey);
     rms.removeMetric(completedRecoveryKey);
     rms.removeMetric(oldestWalAgeKey);
+    rms.removeMetric(sourceInitializingKey);
   }
 
   @Override
@@ -271,6 +278,20 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   }
 
   @Override
+  public void incrSourceInitializing() {
+    sourceInitializing.incr(1);
+  }
+
+  @Override
+  public int getSourceInitializing() {
+    return sourceInitializing.value();
+  }
+
+  @Override public void decrSourceInitializing() {
+    sourceInitializing.decr(1);
+  }
+
+  @Override
   public void init() {
     rms.init();
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
index 977536a..3840fe4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DynamicMetricsRegistry.java
@@ -450,6 +450,40 @@ public class DynamicMetricsRegistry {
   }
 
   /**
+   * Get a MetricMutableGaugeInt from the storage.  If it is not there atomically put it.
+   *
+   * @param gaugeName              name of the gauge to create or get.
+   * @param potentialStartingValue value of the new gauge if we have to create it.
+   */
+  public MutableGaugeInt getGaugeInt(String gaugeName, int potentialStartingValue) {
+    //Try and get the guage.
+    MutableMetric metric = metricsMap.get(gaugeName);
+
+    //If it's not there then try and put a new one in the storage.
+    if (metric == null) {
+      //Create the potential new gauge.
+      MutableGaugeInt newGauge = new MutableGaugeInt(new MetricsInfoImpl(gaugeName, ""),
+        potentialStartingValue);
+
+      // Try and put the gauge in.  This is atomic.
+      metric = metricsMap.putIfAbsent(gaugeName, newGauge);
+
+      //If the value we get back is null then the put was successful and we will return that.
+      //otherwise gaugeInt should contain the thing that was in before the put could be completed.
+      if (metric == null) {
+        return newGauge;
+      }
+    }
+
+    if (!(metric instanceof MutableGaugeInt)) {
+      throw new MetricsException("Metric already exists in registry for metric name: " + gaugeName +
+        " and not of type MetricMutableGaugeInr");
+    }
+
+    return (MutableGaugeInt) metric;
+  }
+
+  /**
    * Get a MetricMutableCounterLong from the storage.  If it is not there atomically put it.
    *
    * @param counterName            Name of the counter to get
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 83bc653..308a746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -22,10 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -134,7 +133,8 @@ public class MetricsSource implements BaseSource {
    * @param tableName String as group and tableName
    */
   public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) {
-    getSourceForTable(tableName).setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp);
+    getSourceForTable(tableName)
+      .setLastShippedAge(EnvironmentEdgeManager.currentTime() - timestamp);
   }
 
   /**
@@ -185,6 +185,22 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Increment the count for initializing sources
+   */
+  public void incrSourceInitializing() {
+    singleSourceSource.incrSourceInitializing();
+    globalSourceSource.incrSourceInitializing();
+  }
+
+  /**
+   * Decrement the count for initializing sources
+   */
+  public void decrSourceInitializing() {
+    singleSourceSource.decrSourceInitializing();
+    globalSourceSource.decrSourceInitializing();
+  }
+
+  /**
    * Add on the the number of log edits read
    *
    * @param delta the number of log edits read.
@@ -291,6 +307,14 @@ public class MetricsSource implements BaseSource {
   }
 
   /**
+   * Get the source initializing counts
+   * @return number of replication sources getting initialized
+   */
+  public int getSourceInitializing() {
+    return singleSourceSource.getSourceInitializing();
+  }
+
+  /**
    * Get the slave peer ID
    * @return peerID
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index de3b7f6..a121e65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -119,6 +117,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   private int maxRetriesMultiplier;
   // Indicates if this particular source is running
   private volatile boolean sourceRunning = false;
+  // Indicates if the source initialization is in progress
+  private volatile boolean startupOngoing = false;
   // Metrics for this source
   private MetricsSource metrics;
   // ReplicationEndpoint which will handle the actual replication
@@ -266,16 +266,19 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
   public void run() {
     // mark we are running now
     this.sourceRunning = true;
+    this.setSourceStartupStatus(true);
     try {
       // start the endpoint, connect to the cluster
       Service.State state = replicationEndpoint.start().get();
       if (state != Service.State.RUNNING) {
         LOG.warn("ReplicationEndpoint was not started. Exiting");
         uninitialize();
+        this.setSourceStartupStatus(false);
         return;
       }
     } catch (Exception ex) {
       LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
+      this.setSourceStartupStatus(false);
       throw new RuntimeException(ex);
     }
 
@@ -300,6 +303,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
     }
 
     if (!this.isSourceActive()) {
+      this.setSourceStartupStatus(false);
       return;
     }
 
@@ -310,6 +314,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
       this.manager.closeQueue(this);
+      this.setSourceStartupStatus(false);
       return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
@@ -327,6 +332,16 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
         worker.startup();
       }
     }
+    this.setSourceStartupStatus(false);
+  }
+
+  private synchronized void setSourceStartupStatus(boolean initializing) {
+    startupOngoing = initializing;
+    if (initializing) {
+      metrics.incrSourceInitializing();
+    } else {
+      metrics.decrSourceInitializing();
+    }
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 3819877..9787120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -419,6 +419,22 @@ public class TestReplicationEndpoint extends TestReplicationBase {
       cells.get(0).getRowLength(), row, 0, row.length));
   }
 
+  /**
+   * Bad Endpoint with failing connection to peer on demand.
+   */
+  public static class BadReplicationEndpoint extends ReplicationEndpointForTest {
+    static boolean failing = true;
+
+    public BadReplicationEndpoint() {
+      super();
+    }
+
+    @Override
+    public synchronized UUID getPeerUUID() {
+      return failing ? null : super.getPeerUUID();
+    }
+  }
+
   public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
     static UUID uuid = UUID.randomUUID();
     static AtomicInteger contructedCount = new AtomicInteger();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index e7ff58f..ca09aa8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -863,7 +863,7 @@ public class TestReplicationSource {
     }
   }
 
-  /*
+  /**
   Test age of oldest wal metric.
   */
   @Test
@@ -898,6 +898,29 @@ public class TestReplicationSource {
     }
   }
 
+  @Test
+  public void testReplicationSourceInitializingMetric() throws Exception {
+    String id = "1";
+    MetricsSource metrics = Mockito.spy(new MetricsSource(id));
+    Mocks mocks = new Mocks();
+    ReplicationSource source = mocks.createReplicationSourceWithMocks(metrics,
+      new TestReplicationEndpoint.BadReplicationEndpoint());
+    source.startup();
+    final MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return metricsSource1.getSourceInitializing() == 1;
+      }
+    });
+    TestReplicationEndpoint.BadReplicationEndpoint.failing = false;
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return metricsSource1.getSourceInitializing() == 0;
+      }
+    });
+    metrics.clear();
+  }
+
   private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
     MetricsReplicationSourceFactory factory = CompatibilitySingletonFactory
       .getInstance(MetricsReplicationSourceFactory.class);