You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/07/19 19:08:06 UTC

[GitHub] [hbase] virajjasani commented on a change in pull request #2095: HBASE-24459 Move the locateMeta logic from AsyncMetaRegionTableLocato…

virajjasani commented on a change in pull request #2095:
URL: https://github.com/apache/hbase/pull/2095#discussion_r456931717



##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry registry,
+    AtomicReference<T> stub, AtomicReference<CompletableFuture<T>> stubMakeFuture,
+    RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+    Function<RpcChannel, T> stubMaker, String type) {
+    return getOrFetch(stub, stubMakeFuture, () -> {
+      CompletableFuture<T> future = new CompletableFuture<>();
+      addListener(registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(stubMaker.apply(
+              rpcClient.createRpcChannel(addr, user, toIntNoOverflow(unit.toMillis(rpcTimeout)))));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, type);
+  }
+
+  private static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
+    AtomicReference<CompletableFuture<T>> futureRef, 
+    Supplier<CompletableFuture<T>> fetch, String type) {
+    for (;;) {
+      T cachedValue = cacheRef.get();
+      if (cachedValue != null) {
+        return CompletableFuture.completedFuture(cachedValue);
+      }
+      LOG.trace("{} cache is null, try fetching from registry", type);
+      if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
+        LOG.debug("Start fetching {} from registry", type);
+        CompletableFuture<T> future = futureRef.get();
+        addListener(fetch.get(), (value, error) -> {
+          if (error != null) {
+            LOG.debug("Failed to fetch {} from registry", type, error);
+            futureRef.getAndSet(null).completeExceptionally(error);

Review comment:
       By any chance, subsequent execution with error can produce NPE here? e.g first thread encounters error, sets futureRef to null and second one when calls `completeExceptionally()`, it calls it on null?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -634,6 +631,27 @@ protected String getUseThisHostnameInstead(Configuration conf) {
   @Override
   public void run() {
     try {
+      // we have to do this in a background thread as for a fresh new cluster, we need to become
+      // active master first to set the cluster id so we can initialize the cluster connection.
+      // for backup master, we need to use async cluster connection to connect to active master for
+      // fetching the content of root table, to serve the locate meta requests from client.
+      Threads.setDaemonThreadRunning(new Thread() {
+
+        @Override
+        public void run() {

Review comment:
       nit: replace with lambda?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -376,12 +376,11 @@ public void run() {
   // manager of assignment nodes in zookeeper
   private AssignmentManager assignmentManager;
 
-
   /**
    * Cache for the meta region replica's locations. Also tracks their changes to avoid stale
    * cache entries.
    */
-  private final MetaRegionLocationCache metaRegionLocationCache;
+  private volatile MetaLocationCache metaLocationCache;

Review comment:
       Yes, this seems better, hopefully with not too much of perf impact when MasterRpcServices access it.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaLocationCache.java
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.locateRowBefore;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocateType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A cache of meta region locations.
+ */
+@InterfaceAudience.Private
+class MetaLocationCache implements Stoppable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MetaLocationCache.class);
+
+  @VisibleForTesting
+  static final String SYNC_INTERVAL_SECONDS =
+    "hbase.master.meta-location-cache.sync-interval-seconds";
+
+  // default sync every 1 second.
+  @VisibleForTesting
+  static final int DEFAULT_SYNC_INTERVAL_SECONDS = 1;
+
+  private static final String FETCH_TIMEOUT_MS =
+    "hbase.master.meta-location-cache.fetch-timeout-ms";
+
+  // default timeout 1 second
+  private static final int DEFAULT_FETCH_TIMEOUT_MS = 1000;
+
+  private static final class CacheHolder {
+
+    final NavigableMap<byte[], RegionLocations> cache;
+
+    final List<HRegionLocation> all;
+
+    CacheHolder(List<HRegionLocation> all) {
+      this.all = Collections.unmodifiableList(all);
+      NavigableMap<byte[], SortedSet<HRegionLocation>> startKeyToLocs =
+        new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      for (HRegionLocation loc : all) {
+        if (loc.getRegion().isSplitParent()) {
+          continue;
+        }
+        startKeyToLocs.computeIfAbsent(loc.getRegion().getStartKey(),
+          k -> new TreeSet<>((l1, l2) -> l1.getRegion().compareTo(l2.getRegion()))).add(loc);
+      }
+      this.cache = startKeyToLocs.entrySet().stream().collect(Collectors.collectingAndThen(
+        Collectors.toMap(Map.Entry::getKey, e -> new RegionLocations(e.getValue()), (u, v) -> {
+          throw new IllegalStateException();
+        }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)), Collections::unmodifiableNavigableMap));
+    }
+  }
+
+  private volatile CacheHolder holder;
+
+  private volatile boolean stopped = false;
+
+  MetaLocationCache(MasterServices master) {
+    int syncIntervalSeconds =
+      master.getConfiguration().getInt(SYNC_INTERVAL_SECONDS, DEFAULT_SYNC_INTERVAL_SECONDS);
+    int fetchTimeoutMs =
+      master.getConfiguration().getInt(FETCH_TIMEOUT_MS, DEFAULT_FETCH_TIMEOUT_MS);
+    master.getChoreService().scheduleChore(new ScheduledChore(
+      getClass().getSimpleName() + "-Sync-Chore", this, syncIntervalSeconds, 0, TimeUnit.SECONDS) {
+
+      @Override
+      protected void chore() {
+        AsyncClusterConnection conn = master.getAsyncClusterConnection();
+        if (conn != null) {
+          addListener(conn.getAllMetaRegionLocations(fetchTimeoutMs), (locs, error) -> {
+            if (error != null) {
+              LOG.warn("Failed to fetch all meta region locations from active master", error);
+              return;
+            }
+            CacheHolder ch = new CacheHolder(locs);
+            holder = ch;

Review comment:
       nit: `holder = new CacheHolder(locs)`

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -2318,19 +2333,17 @@ public long createSystemTable(final TableDescriptor tableDescriptor) throws IOEx
     return procId;
   }
 
-  private void startActiveMasterManager(int infoPort) throws KeeperException {
+  private void startActiveMasterManager(int infoPort) throws KeeperException, IOException {

Review comment:
       nit: IOException is redundant

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
##########
@@ -59,9 +74,29 @@
 
   private final ZNodePaths znodePaths;
 
-  ZKConnectionRegistry(Configuration conf) {
+  private final AtomicReference<Interface> stub = new AtomicReference<>();
+
+  private final AtomicReference<CompletableFuture<Interface>> stubMakeFuture =
+    new AtomicReference<>();

Review comment:
       nit: Since we have multiple `Interface` in MasterProtos, would you prefer providing `ClientMetaService.Interface` for better readability? i.e `AtomicReference<ClientMetaService.Interface>` and `AtomicReference<CompletableFuture<ClientMetaService.Interface>>`
   
   No strong opinion though.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -3276,11 +3290,16 @@ public LocateMetaRegionResponse locateMetaRegion(RpcController controller,
     byte[] row = request.getRow().toByteArray();
     RegionLocateType locateType = ProtobufUtil.toRegionLocateType(request.getLocateType());
     try {
-      master.checkServiceStarted();

Review comment:
       This method `getMetaRegionLocations()` is no longer being called from MasterRegistry. 
   `MasterRegistry.getMetaRegionLocations()` is not in use:
   ```
     public CompletableFuture<RegionLocations> getMetaRegionLocations()
   ```
   
   Other rpc calls i.e `locateMetaRegion()  &  getAllMetaRegionLocations()` are in use.

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry registry,
+    AtomicReference<T> stub, AtomicReference<CompletableFuture<T>> stubMakeFuture,
+    RpcClient rpcClient, User user, long rpcTimeout, TimeUnit unit,
+    Function<RpcChannel, T> stubMaker, String type) {
+    return getOrFetch(stub, stubMakeFuture, () -> {
+      CompletableFuture<T> future = new CompletableFuture<>();
+      addListener(registry.getActiveMaster(), (addr, error) -> {
+        if (error != null) {
+          future.completeExceptionally(error);
+        } else if (addr == null) {
+          future.completeExceptionally(new MasterNotRunningException(
+            "ZooKeeper available but no active master location found"));
+        } else {
+          LOG.debug("The fetched master address is {}", addr);
+          try {
+            future.complete(stubMaker.apply(
+              rpcClient.createRpcChannel(addr, user, toIntNoOverflow(unit.toMillis(rpcTimeout)))));
+          } catch (IOException e) {
+            future.completeExceptionally(e);
+          }
+        }
+
+      });
+      return future;
+    }, type);
+  }
+
+  private static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
+    AtomicReference<CompletableFuture<T>> futureRef, 
+    Supplier<CompletableFuture<T>> fetch, String type) {
+    for (;;) {
+      T cachedValue = cacheRef.get();
+      if (cachedValue != null) {
+        return CompletableFuture.completedFuture(cachedValue);
+      }
+      LOG.trace("{} cache is null, try fetching from registry", type);
+      if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
+        LOG.debug("Start fetching {} from registry", type);
+        CompletableFuture<T> future = futureRef.get();
+        addListener(fetch.get(), (value, error) -> {
+          if (error != null) {
+            LOG.debug("Failed to fetch {} from registry", type, error);
+            futureRef.getAndSet(null).completeExceptionally(error);
+            return;
+          }
+          LOG.debug("The fetched {} is {}", type, value);
+          // Here we update cache before reset future, so it is possible that someone can get a
+          // stale value. Consider this:
+          // 1. update cacheRef
+          // 2. someone clears the cache and relocates again
+          // 3. the futureRef is not null so the old future is used.
+          // 4. we clear futureRef and complete the future in it with the value being
+          // cleared in step 2.
+          // But we do not think it is a big deal as it rarely happens, and even if it happens, the
+          // caller will retry again later, no correctness problems.
+          cacheRef.set(value);
+          futureRef.set(null);
+          future.complete(value);
+        });
+        return future;
+      } else {
+        CompletableFuture<T> future = futureRef.get();
+        if (future != null) {
+          return future;
+        }
+      }
+    }
+  }
+
+  public static CompletableFuture<List<HRegionLocation>> getAllMetaRegionLocations(
+    boolean excludeOfflinedSplitParents,
+    CompletableFuture<ClientMetaService.Interface> getStubFuture,
+    AtomicReference<ClientMetaService.Interface> stubRef,
+    RpcControllerFactory rpcControllerFactory, int callTimeoutMs) {

Review comment:
       Similar to above methods, we can use generics here also:
   
   ```
     public static <T extends ClientMetaService.Interface> CompletableFuture<List<HRegionLocation>> getAllMetaRegionLocations(
       boolean excludeOfflinedSplitParents, CompletableFuture<T> getStubFuture,
       AtomicReference<T> stubRef, RpcControllerFactory rpcControllerFactory, int callTimeoutMs) {
   ```
   &
   ```
     public static<T extends ClientMetaService.Interface> void tryClearMasterStubCache(IOException error,
       T currentStub, AtomicReference<T> stub) {
   ```
   
   Being Utils class, this might suit well, thought?
   But if you feel this is overkill, we are good without that change.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -3305,24 +3324,31 @@ public GetAllMetaRegionLocationsResponse getAllMetaRegionLocations(RpcController
     GetAllMetaRegionLocationsRequest request) throws ServiceException {
     boolean excludeOfflinedSplitParents = request.getExcludeOfflinedSplitParents();
     try {
-      master.checkServiceStarted();
       if (master.getMasterCoprocessorHost() != null) {
         master.getMasterCoprocessorHost().preGetAllMetaRegionLocations(excludeOfflinedSplitParents);
       }
-      List<RegionLocations> locs = master.getAllMetaRegionLocations(excludeOfflinedSplitParents);
-      List<HRegionLocation> list = new ArrayList<>();
-      GetAllMetaRegionLocationsResponse.Builder builder =
-        GetAllMetaRegionLocationsResponse.newBuilder();
-      if (locs != null) {
-        for (RegionLocations ls : locs) {
-          for (HRegionLocation loc : ls) {
-            if (loc != null) {
-              builder.addMetaLocations(ProtobufUtil.toRegionLocation(loc));
-              list.add(loc);
+      MetaLocationCache cache = master.getMetaLocationCache();
+      List<HRegionLocation> list;
+      if (cache != null) {

Review comment:
       We are never expecting null cache for backup masters right?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -634,6 +631,27 @@ protected String getUseThisHostnameInstead(Configuration conf) {
   @Override
   public void run() {
     try {
+      // we have to do this in a background thread as for a fresh new cluster, we need to become
+      // active master first to set the cluster id so we can initialize the cluster connection.
+      // for backup master, we need to use async cluster connection to connect to active master for
+      // fetching the content of root table, to serve the locate meta requests from client.
+      Threads.setDaemonThreadRunning(new Thread() {
+
+        @Override
+        public void run() {
+          for (;;) {
+            try {
+              if (!Strings.isNullOrEmpty(ZKClusterId.readClusterIdZNode(zooKeeper))) {
+                setupClusterConnection();
+                break;
+              }

Review comment:
       For else part, maybe we can have trace log?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
##########
@@ -652,4 +626,160 @@ static void setCoprocessorError(RpcController controller, Throwable error) {
       controller.setFailed(error.toString());
     }
   }
+
+  public static RegionLocations locateRow(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    Map.Entry<byte[], RegionLocations> entry = cache.floorEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    byte[] endKey = loc.getRegion().getEndKey();
+    if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static RegionLocations locateRowBefore(NavigableMap<byte[], RegionLocations> cache,
+    TableName tableName, byte[] row, int replicaId) {
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    Map.Entry<byte[], RegionLocations> entry =
+      isEmptyStopRow ? cache.lastEntry() : cache.lowerEntry(row);
+    if (entry == null) {
+      return null;
+    }
+    RegionLocations locs = entry.getValue();
+    HRegionLocation loc = locs.getRegionLocation(replicaId);
+    if (loc == null) {
+      return null;
+    }
+    if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
+      (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
+          Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
+      }
+      return locs;
+    } else {
+      return null;
+    }
+  }
+
+  public static void tryClearMasterStubCache(IOException error,
+    ClientMetaService.Interface currentStub, AtomicReference<ClientMetaService.Interface> stub) {
+    if (ClientExceptionsUtil.isConnectionException(error) ||
+      error instanceof ServerNotRunningYetException) {
+      stub.compareAndSet(currentStub, null);
+    }
+  }
+
+  public static <T> CompletableFuture<T> getMasterStub(ConnectionRegistry registry,

Review comment:
       Since this is used for `MasterService.Interface` and `ClientMetaService.Interface`, we can call the method something generic? maybe just `getStub()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org