You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/08/14 01:54:28 UTC

[GitHub] [hbase] Apache9 commented on a change in pull request #2130: HBASE-24765: Dynamic master discovery

Apache9 commented on a change in pull request #2130:
URL: https://github.com/apache/hbase/pull/2130#discussion_r459835061



##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have some spurious
+        // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
+        synchronized (refreshMasters) {

Review comment:
       Usually a notify without any flag will have strange race problem...
   At least let's have a 'triggerRefresh' flag to guard redundant notification?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -226,17 +276,19 @@ private IOException badResponse(String debug) {
 
   private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
     Predicate<T> isValidResp, String debug) {
+    Set<ServerName> masterServers = masterAddr2Stub.keySet();

Review comment:
       I think here we should assign masterAddr2Stub to a local variable before using it?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -2931,6 +2935,27 @@ public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
     return resp.build();
   }
 
+  @Override
+  public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
+      throws ServiceException {
+    GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
+    // Active master
+    Optional<ServerName> serverName = master.getActiveMaster();
+    serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
+        .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
+    // Backup masters
+    try {
+      // TODO: Cache the backup masters to avoid a ZK RPC for each getMasters() call.

Review comment:
       I think this should be done in this patch, as now this method will replace the old getActiveMaster method, which makes it not only be used in our internal refresh but also be used by end users, we should not let users still have the ability to harmmer zookeeper...

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -259,17 +311,40 @@ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsRespo
       .thenApply(GetClusterIdResponse::getClusterId);
   }
 
-  private ServerName transformServerName(GetActiveMasterResponse resp) {
-    return ProtobufUtil.toServerName(resp.getServerName());
+  private static boolean hasActiveMaster(GetMastersResponse resp) {
+    List<GetMastersResponseEntry> activeMasters =
+        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
+        Collectors.toList());
+    return activeMasters.size() == 1;
+  }
+
+  private static ServerName filterActiveMaster(GetMastersResponse resp) {
+    List<GetMastersResponseEntry> activeMasters =
+        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
+            Collectors.toList());
+    Preconditions.checkState(activeMasters.size() == 1);

Review comment:
       This is a behavior change? I do not think we will throw IllegalStateException when there is no active master before this patch?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -89,11 +97,17 @@
   private final int hedgedReadFanOut;
 
   // Configured list of masters to probe the meta information from.
-  private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
+  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
 
   // RPC client used to talk to the masters.
   private final RpcClient rpcClient;
   private final RpcControllerFactory rpcControllerFactory;
+  private final int rpcTimeoutMs;
+  // For synchronizing on refreshing the master end-points
+  private final Object refreshMasters = new Object();
+  // Refreshed every WAIT_TIME_OUT_MS or unless explicitly invoked.
+  private static final int WAIT_TIME_OUT_MS = 5 * 60 * 1000; // 5 mins

Review comment:
       Do we want to make it configurable?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -259,17 +311,40 @@ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsRespo
       .thenApply(GetClusterIdResponse::getClusterId);
   }
 
-  private ServerName transformServerName(GetActiveMasterResponse resp) {
-    return ProtobufUtil.toServerName(resp.getServerName());
+  private static boolean hasActiveMaster(GetMastersResponse resp) {
+    List<GetMastersResponseEntry> activeMasters =
+        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
+        Collectors.toList());
+    return activeMasters.size() == 1;
+  }
+
+  private static ServerName filterActiveMaster(GetMastersResponse resp) {
+    List<GetMastersResponseEntry> activeMasters =
+        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
+            Collectors.toList());
+    Preconditions.checkState(activeMasters.size() == 1);
+    return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
   }
 
   @Override
   public CompletableFuture<ServerName> getActiveMaster() {
     return this
-      .<GetActiveMasterResponse> call(
-        (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
-        GetActiveMasterResponse::hasServerName, "getActiveMaster()")
-      .thenApply(this::transformServerName);
+      .<GetMastersResponse> call(
+        (c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
+        MasterRegistry::hasActiveMaster, "getMasters()")
+      .thenApply(MasterRegistry::filterActiveMaster);
+  }
+
+  private static List<ServerName> transformServerNames(GetMastersResponse resp) {
+    return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
+        s.getServerName())).collect(Collectors.toList());
+  }
+
+  public CompletableFuture<List<ServerName>> getMasters() {

Review comment:
       Do not need to be public?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -115,20 +129,50 @@
   MasterRegistry(Configuration conf) throws IOException {
     this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
       MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
-    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
       conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
     // this through the master registry...
     // This is a problem as we will use the cluster id to determine the authentication method
     rpcClient = RpcClientFactory.createClient(conf, null);
     rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
+    // by fetching the end points from this list.
+    populateMasterStubs(parseMasterAddrs(conf));
+    Runnable masterEndPointRefresher = () -> {
+      while (!Thread.interrupted()) {

Review comment:
       I prefer we add a stop flag for the Registry and test it to determine whether we should quit.

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -170,6 +214,11 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti
     callable.call(controller, stub, resp -> {
       if (controller.failed()) {
         future.completeExceptionally(controller.getFailed());
+        // RPC has failed, trigger a refresh of master end points. We can have some spurious

Review comment:
       I think we can test the exception type? Only if it is a connect exception, we will refresh the address list?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+
+/**
+ * Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
+ * uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
+ * By default the refresh happens periodically (configured via
+ * {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
+ * {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
+ * should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
+ */
+@InterfaceAudience.Private
+public class MasterAddressRefresher implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
+  public static final String PERIODIC_REFRESH_INTERVAL_SECS =
+      "hbase.client.master_registry.refresh_interval_secs";
+  private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
+  public static final String MIN_SECS_BETWEEN_REFRESHES =
+      "hbase.client.master_registry.min_secs_between_refreshes";
+  private static final long MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
+
+  private final ExecutorService pool;
+  private final MasterRegistry registry;
+  private final long periodicRefreshMs;
+  private final long timeBetweenRefreshesMs;
+  private final Object refreshMasters = new Object();
+
+  @Override
+  public void close() {
+    pool.shutdownNow();
+  }
+
+  /**
+   * Thread that refreshes the master end points until it is interrupted via {@link #close()}.
+   * Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
+   */
+  private class RefreshThread implements Runnable {
+    @Override
+    public void run() {
+      long lastRpcTs = 0;
+      while (!Thread.interrupted()) {
+        try {
+          // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
+          // have duplicate refreshes because once the thread is past the wait(), notify()s are
+          // ignored until the thread is back to the waiting state.
+          synchronized (refreshMasters) {
+            refreshMasters.wait(periodicRefreshMs);
+          }
+          long currentTs = EnvironmentEdgeManager.currentTime();
+          if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
+            continue;
+          }
+          lastRpcTs = currentTs;
+          LOG.debug("Attempting to refresh master address end points.");
+          Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
+          registry.populateMasterStubs(newMasters);
+          LOG.debug("Finished refreshing master end points. {}", newMasters);
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
+          break;
+        } catch (ExecutionException | IOException e) {
+          LOG.debug("Error populating latest list of masters.", e);
+        }
+      }
+    }
+  }
+
+  MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
+    pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()

Review comment:
       I think this could be a follow on issue. Let's get this feature in first as I think it is very useful for a long running service which depends on HBase? I have an idea on making use of the HashedWheelTimeoutTimer to do the refreshing work, will file an improvement issue if I have time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org