You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/08/25 10:42:29 UTC

[hbase] branch master updated: HBASE-26180 Introduce a initial refresh interval for RpcConnectionRegistry (#3601)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 91db10a  HBASE-26180 Introduce a initial refresh interval for RpcConnectionRegistry (#3601)
91db10a is described below

commit 91db10a8bc4fe8751f222239cfd0500c4b4c8042
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Aug 25 18:42:04 2021 +0800

    HBASE-26180 Introduce a initial refresh interval for RpcConnectionRegistry (#3601)
    
    Signed-off-by: Xin Sun <dd...@gmail.com>
---
 .../client/AbstractRpcBasedConnectionRegistry.java | 10 ++++---
 .../apache/hadoop/hbase/client/MasterRegistry.java |  5 +++-
 .../hbase/client/RegistryEndpointsRefresher.java   | 33 ++++++++++++++++------
 .../hadoop/hbase/client/RpcConnectionRegistry.java | 14 ++++++++-
 .../client/TestRegistryEndpointsRefresher.java     | 32 +++++++++++++++------
 .../client/TestRpcBasedRegistryHedgedReads.java    |  6 +++-
 .../hbase/client/TestRpcConnectionRegistry.java    |  5 ++--
 7 files changed, 79 insertions(+), 26 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
index 5fdbc55..60137d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
@@ -90,8 +90,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
   private final RegistryEndpointsRefresher registryEndpointRefresher;
 
   protected AbstractRpcBasedConnectionRegistry(Configuration conf,
-    String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
-    String minRefreshIntervalSecsConfigName) throws IOException {
+    String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
+    String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
+    throws IOException {
     this.hedgedReadFanOut =
       Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
     rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
@@ -103,8 +104,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     populateStubs(getBootstrapNodes(conf));
     // could return null here is refresh interval is less than zero
-    registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
-      refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
+    registryEndpointRefresher =
+      RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
+        refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
   }
 
   protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index af6eaa5..64e389c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
   public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
     "hbase.client.master_registry.hedged.fanout";
 
+  public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
+    "hbase.client.master_registry.initial_refresh_delay_secs";
+
   public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
     "hbase.client.master_registry.refresh_interval_secs";
 
@@ -85,7 +88,7 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
   }
 
   MasterRegistry(Configuration conf) throws IOException {
-    super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+    super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
       MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
index 9b450f9..2064021 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java
@@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher {
 
   private final Thread thread;
   private final Refresher refresher;
+  private final long initialDelayMs;
   private final long periodicRefreshMs;
   private final long minTimeBetweenRefreshesMs;
 
@@ -56,9 +57,20 @@ final class RegistryEndpointsRefresher {
     notifyAll();
   }
 
+  private long getRefreshIntervalMs(boolean firstRefresh) {
+    if (refreshNow) {
+      return minTimeBetweenRefreshesMs;
+    }
+    if (firstRefresh) {
+      return initialDelayMs;
+    }
+    return periodicRefreshMs;
+  }
+
   // The main loop for the refresh thread.
   private void mainLoop() {
     long lastRefreshTime = EnvironmentEdgeManager.currentTime();
+    boolean firstRefresh = true;
     for (;;) {
       synchronized (this) {
         for (;;) {
@@ -68,9 +80,12 @@ final class RegistryEndpointsRefresher {
           }
           // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
           // otherwise wait until periodicRefreshMs elapsed
-          long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
+          long waitTime = getRefreshIntervalMs(firstRefresh) -
             (EnvironmentEdgeManager.currentTime() - lastRefreshTime);
           if (waitTime <= 0) {
+            // we are going to refresh, reset this flag
+            firstRefresh = false;
+            refreshNow = false;
             break;
           }
           try {
@@ -81,8 +96,6 @@ final class RegistryEndpointsRefresher {
             continue;
           }
         }
-        // we are going to refresh, reset this flag
-        refreshNow = false;
       }
       LOG.debug("Attempting to refresh registry end points");
       try {
@@ -104,8 +117,9 @@ final class RegistryEndpointsRefresher {
     void refresh() throws IOException;
   }
 
-  private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
-    Refresher refresher) {
+  private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
+    long minTimeBetweenRefreshesMs, Refresher refresher) {
+    this.initialDelayMs = initialDelayMs;
     this.periodicRefreshMs = periodicRefreshMs;
     this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
     this.refresher = refresher;
@@ -129,16 +143,19 @@ final class RegistryEndpointsRefresher {
    * {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
    * refreshing of endpoints.
    */
-  static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
-    String minIntervalSecsConfigName, Refresher refresher) {
+  static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
+    String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
     long periodicRefreshMs = TimeUnit.SECONDS
       .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
     if (periodicRefreshMs <= 0) {
       return null;
     }
+    long initialDelayMs = Math.max(1,
+      TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
     long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
       .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
     Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
-    return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
+    return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
+      minTimeBetweenRefreshesMs, refresher);
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
index bcd37b1..731d620 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
@@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
   /** Configuration key that controls the fan out of requests **/
   public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
 
+  /**
+   * As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
+   * possible that different end users will configure the same machine which makes the machine over
+   * load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
+   * the bootstrap nodes we want them to connect to.
+   * <p/>
+   * The default value for initial refresh delay is 1/10 of periodic refresh interval.
+   */
+  public static final String INITIAL_REFRESH_DELAY_SECS =
+    "hbase.client.bootstrap.initial_refresh_delay_secs";
+
   public static final String PERIODIC_REFRESH_INTERVAL_SECS =
     "hbase.client.bootstrap.refresh_interval_secs";
 
@@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
   private static final char ADDRS_CONF_SEPARATOR = ',';
 
   RpcConnectionRegistry(Configuration conf) throws IOException {
-    super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
+    super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
+      MIN_SECS_BETWEEN_REFRESHES);
   }
 
   @Override
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
index f8893c9..3d6fe15 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
 
+  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
+    "hbase.test.registry.initial.delay.secs";
   private static final String INTERVAL_SECS_CONFIG_NAME =
     "hbase.test.registry.refresh.interval.secs";
   private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
@@ -75,33 +77,45 @@ public class TestRegistryEndpointsRefresher {
     callTimestamps.add(EnvironmentEdgeManager.currentTime());
   }
 
-  private void createRefresher(long intervalSecs, long minIntervalSecs) {
+  private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
+    conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
     conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
     conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
-    refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
-      MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
+    refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
+      INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
   }
 
   @Test
   public void testDisableRefresh() {
     conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
     assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
-      MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
+      INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
   }
 
   @Test
-  public void testPeriodicEndpointRefresh() throws IOException {
+  public void testInitialDelay() throws InterruptedException {
+    createRefresher(1, 10, 0);
+    // Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
+    // seconds
+    Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
+    // Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
+    Thread.sleep(5000);
+    assertEquals(1, refreshCallCounter.get());
+  }
+
+  @Test
+  public void testPeriodicMasterEndPointRefresh() {
     // Refresh every 1 second.
-    createRefresher(1, 0);
+    createRefresher(1, 1, 0);
     // Wait for > 3 seconds to see that at least 3 refresh have been made.
     Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
   }
 
   @Test
-  public void testDurationBetweenRefreshes() throws IOException {
+  public void testDurationBetweenRefreshes() {
     // Disable periodic refresh
     // A minimum duration of 1s between refreshes
-    createRefresher(Integer.MAX_VALUE, 1);
+    createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
     // Issue a ton of manual refreshes.
     for (int i = 0; i < 10000; i++) {
       refresher.refreshNow();
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
index 66eee82..2979743 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
@@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads {
   private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
 
   private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
+  private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
+    "hbase.test.refresh.initial.delay.secs";
   private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
     "hbase.test.refresh.interval.secs";
   private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
@@ -153,7 +155,8 @@ public class TestRpcBasedRegistryHedgedReads {
     Configuration conf = UTIL.getConfiguration();
     conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
     return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
-      REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
+      INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
+      MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
 
       @Override
       protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
@@ -173,6 +176,7 @@ public class TestRpcBasedRegistryHedgedReads {
     conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
       RpcClient.class);
     // disable refresh, we do not need to refresh in this test
+    conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
     conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
     conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
     BOOTSTRAP_NODES = IntStream.range(0, 10)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
index 69c08b6..4cbabb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java
@@ -58,6 +58,7 @@ public class TestRpcConnectionRegistry {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     // allow refresh immediately so we will switch to use region servers soon.
+    UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
     UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
     UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
     UTIL.startMiniCluster(3);
@@ -91,8 +92,8 @@ public class TestRpcConnectionRegistry {
   @Test
   public void testRegistryRPCs() throws Exception {
     HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
-    // wait until we switch to use region servers
-    UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3);
+    // sleep 3 seconds, since our initial delay is 1 second, we should have refreshed the endpoints
+    Thread.sleep(3000);
     assertThat(registry.getParsedServers(),
       hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));