You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/05/06 09:09:45 UTC

[hbase] branch branch-2.3 updated: HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry … (#1593)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new b2f4e6e  HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry … (#1593)
b2f4e6e is described below

commit b2f4e6ecf6da88f617deca1b6ae76ed1d98be0f1
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed May 6 15:55:26 2020 +0800

    HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry … (#1593)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../apache/hadoop/hbase/client/MasterRegistry.java | 272 +++++++++++---------
 .../exceptions/MasterRegistryFetchException.java   |   3 +
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  11 +-
 .../apache/hadoop/hbase/ipc/HedgedRpcChannel.java  | 274 ---------------------
 .../apache/hadoop/hbase/ipc/NettyRpcClient.java    |  21 +-
 .../org/apache/hadoop/hbase/ipc/RpcClient.java     |  12 +-
 .../client/TestMasterRegistryHedgedReads.java      | 231 +++++++++++++++++
 .../java/org/apache/hadoop/hbase/HConstants.java   |  13 -
 .../hadoop/hbase/client/FromClientSideBase.java    |  31 ++-
 .../hadoop/hbase/client/TestClientTimeouts.java    |  11 +-
 .../hadoop/hbase/client/TestFromClientSide.java    |   1 +
 .../hadoop/hbase/client/TestMasterRegistry.java    |  30 ++-
 .../hbase/client/TestScannersFromClientSide.java   |  33 ++-
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   | 107 +-------
 .../hbase/ipc/TestProtobufRpcServiceImpl.java      |  15 +-
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java       |  14 --
 16 files changed, 445 insertions(+), 634 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 9b01429..4d0a591 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -18,9 +18,9 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
-import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
-import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
 import static org.apache.hadoop.hbase.util.DNS.getHostname;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -29,7 +29,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -44,11 +46,15 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.DNS.ServerType;
 import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
@@ -61,53 +67,79 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaReg
 /**
  * Master based registry implementation. Makes RPCs to the configured master addresses from config
  * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
- *
- * It supports hedged reads, which can be enabled by setting
- * {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
- * out the requests batch is controlled by
- * {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
- *
+ * <p/>
+ * It supports hedged reads, set the fan out of the requests batch by
+ * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
+ * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
+ * <p/>
  * TODO: Handle changes to the configuration dynamically without having to restart the client.
  */
 @InterfaceAudience.Private
 public class MasterRegistry implements ConnectionRegistry {
+
+  /** Configuration key that controls the fan out of requests **/
+  public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
+    "hbase.client.master_registry.hedged.fanout";
+
+  /** Default value for the fan out of hedged requests. **/
+  public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
+
   private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
 
+  private final int hedgedReadFanOut;
+
   // Configured list of masters to probe the meta information from.
-  private final Set<ServerName> masterServers;
+  private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
 
   // RPC client used to talk to the masters.
   private final RpcClient rpcClient;
   private final RpcControllerFactory rpcControllerFactory;
-  private final int rpcTimeoutMs;
-
-  MasterRegistry(Configuration conf) throws UnknownHostException {
-    boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
-        MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
-    Configuration finalConf;
-    if (!hedgedReadsEnabled) {
-      // If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
-      // the configuration so that other places reusing this reference is not affected.
-      finalConf = new Configuration(conf);
-      finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
-    } else {
-      finalConf = conf;
+
+  /**
+   * Parses the list of master addresses from the provided configuration. Supported format is comma
+   * separated host[:port] values. If no port number if specified, default master port is assumed.
+   * @param conf Configuration to parse from.
+   */
+  private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
+    Set<ServerName> masterAddrs = new HashSet<>();
+    String configuredMasters = getMasterAddr(conf);
+    for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+      HostAndPort masterHostPort =
+        HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+      masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
     }
-    if (conf.get(MASTER_ADDRS_KEY) != null) {
-      finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY));
+    Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
+    return masterAddrs;
+  }
+
+  MasterRegistry(Configuration conf) throws IOException {
+    this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+      MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
+    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
+    // this through the master registry...
+    // This is a problem as we will use the cluster id to determine the authentication method
+    rpcClient = RpcClientFactory.createClient(conf, null);
+    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
+    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
+      ImmutableMap.builderWithExpectedSize(masterAddrs.size());
+    User user = User.getCurrent();
+    for (ServerName masterAddr : masterAddrs) {
+      builder.put(masterAddr,
+        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
     }
-    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    masterServers = new HashSet<>();
-    parseMasterAddrs(finalConf);
-    rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
-    rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
+    masterAddr2Stub = builder.build();
   }
 
   /**
    * Builds the default master address end point if it is not specified in the configuration.
+   * <p/>
+   * Will be called in {@code HBaseTestingUtility}.
    */
-  public static String getMasterAddr(Configuration conf) throws UnknownHostException  {
+  @VisibleForTesting
+  public static String getMasterAddr(Configuration conf) throws UnknownHostException {
     String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
     if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
       return masterAddrFromConf;
@@ -118,63 +150,87 @@ public class MasterRegistry implements ConnectionRegistry {
   }
 
   /**
-   * @return Stub needed to make RPC using a hedged channel to the master end points.
+   * For describing the actual asynchronous rpc call.
+   * <p/>
+   * Typically, you can use lambda expression to implement this interface as
+   *
+   * <pre>
+   * (c, s, d) -> s.xxx(c, your request here, d)
+   * </pre>
    */
-  private ClientMetaService.Interface getMasterStub() throws IOException {
-    return ClientMetaService.newStub(
-        rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
+  @FunctionalInterface
+  private interface Callable<T> {
+    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
   }
 
-  /**
-   * Parses the list of master addresses from the provided configuration. Supported format is
-   * comma separated host[:port] values. If no port number if specified, default master port is
-   * assumed.
-   * @param conf Configuration to parse from.
-   */
-  private void parseMasterAddrs(Configuration conf) throws UnknownHostException {
-    String configuredMasters = getMasterAddr(conf);
-    for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
-      HostAndPort masterHostPort =
-          HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
-      masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
-    }
-    Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
+  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
+    Callable<T> callable) {
+    HBaseRpcController controller = rpcControllerFactory.newController();
+    CompletableFuture<T> future = new CompletableFuture<>();
+    callable.call(controller, stub, resp -> {
+      if (controller.failed()) {
+        future.completeExceptionally(controller.getFailed());
+      } else {
+        future.complete(resp);
+      }
+    });
+    return future;
   }
 
-  @VisibleForTesting
-  public Set<ServerName> getParsedMasterServers() {
-    return Collections.unmodifiableSet(masterServers);
+  private IOException badResponse(String debug) {
+    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
   }
 
   /**
-   * Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
-   * the rpc finishes and the response is propagated to the passed future.
-   * @param future Result future to which the rpc response is propagated.
-   * @param isValidResp Checks if the rpc response has a valid result.
-   * @param transformResult Transforms the result to a different form as expected by callers.
-   * @param hrc RpcController instance for this rpc.
-   * @param debug Debug message passed along to the caller in case of exceptions.
-   * @param <T> RPC result type.
-   * @param <R> Transformed type of the result.
-   * @return A call back that can be embedded in the non-blocking rpc call.
+   * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
+   * will complete the future and quit. If all the requests in one round are failed, we will start
+   * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
+   * been tried and all of them are failed, we will fail the future.
    */
-  private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
-      Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
-      final String debug) {
-    return rpcResult -> {
-      if (rpcResult == null) {
-        future.completeExceptionally(
-            new MasterRegistryFetchException(masterServers, hrc.getFailed()));
-        return;
-      }
-      if (!isValidResp.test(rpcResult)) {
-        // Rpc returned ok, but result was malformed.
-        future.completeExceptionally(new IOException(
-            String.format("Invalid result for request %s. Will be retried", debug)));
-        return;
-      }
-      future.complete(transformResult.apply(rpcResult));
-    };
+  private <T extends Message> void groupCall(CompletableFuture<T> future,
+    List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
+    Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
+    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
+    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
+    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+      addListener(call(masterStubs.get(i), callable), (r, e) -> {
+        // a simple check to skip all the later operations earlier
+        if (future.isDone()) {
+          return;
+        }
+        if (e == null && !isValidResp.test(r)) {
+          e = badResponse(debug);
+        }
+        if (e != null) {
+          // make sure when remaining reaches 0 we have all exceptions in the errors queue
+          errors.add(e);
+          if (remaining.decrementAndGet() == 0) {
+            if (endIndexExclusive == masterStubs.size()) {
+              // we are done, complete the future with exception
+              RetriesExhaustedException ex = new RetriesExhaustedException("masters",
+                masterStubs.size(), new ArrayList<>(errors));
+              future.completeExceptionally(
+                new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
+            } else {
+              groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
+                errors);
+            }
+          }
+        } else {
+          // do not need to decrement the counter any more as we have already finished the future.
+          future.complete(r);
+        }
+      });
+    }
+  }
+
+  private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
+    Predicate<T> isValidResp, String debug) {
+    List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
+    Collections.shuffle(masterStubs, ThreadLocalRandom.current());
+    CompletableFuture<T> future = new CompletableFuture<>();
+    groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
+    return future;
   }
 
   /**
@@ -182,40 +238,25 @@ public class MasterRegistry implements ConnectionRegistry {
    */
   private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
     List<HRegionLocation> regionLocations = new ArrayList<>();
-    resp.getMetaLocationsList().forEach(
-      location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
+    resp.getMetaLocationsList()
+      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
     return new RegionLocations(regionLocations);
   }
 
   @Override
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
-    CompletableFuture<RegionLocations> result = new CompletableFuture<>();
-    HBaseRpcController hrc = rpcControllerFactory.newController();
-    RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
-      (rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
-        "getMetaRegionLocations()");
-    try {
-      getMasterStub().getMetaRegionLocations(
-          hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
-    } catch (IOException e) {
-      result.completeExceptionally(e);
-    }
-    return result;
+    return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
+      GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
+      "getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
   }
 
   @Override
   public CompletableFuture<String> getClusterId() {
-    CompletableFuture<String> result = new CompletableFuture<>();
-    HBaseRpcController hrc = rpcControllerFactory.newController();
-    RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
-        GetClusterIdResponse::hasClusterId,  GetClusterIdResponse::getClusterId, hrc,
-        "getClusterId()");
-    try {
-      getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
-    } catch (IOException e) {
-      result.completeExceptionally(e);
-    }
-    return result;
+    return this
+      .<GetClusterIdResponse> call(
+        (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
+        GetClusterIdResponse::hasClusterId, "getClusterId()")
+      .thenApply(GetClusterIdResponse::getClusterId);
   }
 
   private ServerName transformServerName(GetActiveMasterResponse resp) {
@@ -224,17 +265,16 @@ public class MasterRegistry implements ConnectionRegistry {
 
   @Override
   public CompletableFuture<ServerName> getActiveMaster() {
-    CompletableFuture<ServerName> result = new CompletableFuture<>();
-    HBaseRpcController hrc = rpcControllerFactory.newController();
-    RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
-        GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
-        "getActiveMaster()");
-    try {
-      getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
-    } catch (IOException e) {
-      result.completeExceptionally(e);
-    }
-    return result;
+    return this
+      .<GetActiveMasterResponse> call(
+        (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
+        GetActiveMasterResponse::hasServerName, "getActiveMaster()")
+      .thenApply(this::transformServerName);
+  }
+
+  @VisibleForTesting
+  Set<ServerName> getParsedMasterServers() {
+    return masterAddr2Stub.keySet();
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
index 18871be..ca80ed5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
@@ -30,6 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class MasterRegistryFetchException extends HBaseIOException {
+
+  private static final long serialVersionUID = 6992134872168185171L;
+
   public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
     super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
         failure);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index b0baff2..bf2f361 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -47,6 +47,7 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
@@ -60,6 +61,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 
 /**
@@ -512,13 +514,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
     return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
   }
 
-  @Override
-  public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
-      throws UnknownHostException {
-    // Check HedgedRpcChannel implementation for detailed comments.
-    throw new UnsupportedOperationException("Hedging not supported for this implementation.");
-  }
-
   private static class AbstractRpcChannel {
 
     protected final InetSocketAddress addr;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
deleted file mode 100644
index 7b681e0..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.PrettyPrinter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-
-/**
- * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
- * First received response is returned to the caller. This abstracts out the logic needed to batch
- * requests to multiple end points underneath and presents itself as a single logical RpcChannel to
- * the client.
- *
- * Hedging Details:
- * ---------------
- * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
- * end points to make the call to. We do multiple iterations until we get a proper response to the
- * rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
- * configurable and is also known as 'fanOutSize'.
- *
- * - We randomize the addresses up front so that the batch order per client is non deterministic.
- * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
- * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
- * load on the client and server side when configuring the fan out.
- *
- * - In a happy case, once we receive a response from one end point, we cancel all the
- * other inflight rpcs in the same batch and return the response to the caller. If we do not get a
- * valid response from any address end point, we propagate the error back to the caller.
- *
- * - Rpc timeouts are applied to every hedged rpc.
- *
- * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
- * be hedged (for example: cluster state changing rpcs).
- *
- * (TODO) Retries and Adaptive hedging policy:
- * ------------------------------------------
- *
- * - No retries are handled at the channel level. Retries can be built in upper layers. However the
- * question is, do we even need retries? Hedging in fact is a substitute for retries.
- *
- * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
- * policy more adaptive. In most happy cases, the rpcs from the first few end points should return
- * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
- * is not needed. So, the idea is to make this request pattern pluggable so that the requests are
- * hedged only when needed.
- */
-class HedgedRpcChannel implements RpcChannel {
-  private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
-
-  /**
-   * Currently hedging is only supported for non-blocking connection implementation types because
-   * the channel implementation inherently relies on the connection implementation being async.
-   * Refer to the comments in doCallMethod().
-   */
-  private final NettyRpcClient rpcClient;
-  // List of service addresses to hedge the requests to.
-  private final List<InetSocketAddress> addrs;
-  private final User ticket;
-  private final int rpcTimeout;
-  // Controls the size of request fan out (number of rpcs per a single batch).
-  private final int fanOutSize;
-
-  /**
-   * A simple rpc call back implementation to notify the batch context if any rpc is successful.
-   */
-  private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
-    private  final BatchRpcCtx batchRpcCtx;
-    private final HBaseRpcController rpcController;
-    BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
-      this.batchRpcCtx = batchRpcCtx;
-      this.rpcController = rpcController;
-    }
-    @Override
-    public void run(Message result) {
-      batchRpcCtx.setResultIfNotSet(result, rpcController);
-    }
-  }
-
-  /**
-   * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
-   * synchronize on multiple RPCs to different end points fetching the result. All the methods are
-   * thread-safe.
-   */
-  private static class BatchRpcCtx {
-    // Result set by the thread finishing first. Set only once.
-    private final AtomicReference<Message> result = new AtomicReference<>();
-    // Caller waits on this latch being set.
-    // We set this to 1, so that the first successful RPC result is returned to the client.
-    private CountDownLatch resultsReady = new CountDownLatch(1);
-    // Failed rpc book-keeping.
-    private AtomicInteger failedRpcCount = new AtomicInteger();
-    // All the call handles for this batch.
-    private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
-
-    // Target addresses.
-    private final List<InetSocketAddress> addresses;
-    // Called when the result is ready.
-    private final RpcCallback<Message> callBack;
-    // Last failed rpc's exception. Used to propagate the reason to the controller.
-    private IOException lastFailedRpcReason;
-
-
-    BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
-      this.addresses = addresses;
-      this.callBack = Preconditions.checkNotNull(callBack);
-    }
-
-    /**
-     * Sets the result only if it is not already set by another thread. Thread that successfully
-     * sets the result also count downs the latch.
-     * @param result Result to be set.
-     */
-    public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
-      if (rpcController.failed()) {
-        incrementFailedRpcs(rpcController.getFailed());
-        return;
-      }
-      if (this.result.compareAndSet(null, result)) {
-        resultsReady.countDown();
-        // Cancel all pending in flight calls.
-        for (Call call: callsInFlight) {
-          // It is ok to do it for all calls as it is a no-op if the call is already done.
-          final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
-              "for the same rpc already succeeded. This is not needed anymore.", call);
-          call.setException(new CallCancelledException(exceptionMsg));
-        }
-      }
-    }
-
-    /**
-     * Waits until the results are populated and calls the callback if the call is successful.
-     * @return true for successful rpc and false otherwise.
-     */
-    public boolean waitForResults() {
-      try {
-        // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
-        // something on the remote is broken. Worst case we should wait for rpc time out to kick in.
-        resultsReady.await();
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
-      }
-      Message message = result.get();
-      if (message != null) {
-        callBack.run(message);
-        return true;
-      }
-      return false;
-    }
-
-    public void addCallInFlight(Call c) {
-      callsInFlight.add(c);
-    }
-
-    public void incrementFailedRpcs(IOException reason) {
-      if (failedRpcCount.incrementAndGet() == addresses.size()) {
-        lastFailedRpcReason = reason;
-        // All the rpcs in this batch have failed. Invoke the waiting threads.
-        resultsReady.countDown();
-      }
-    }
-
-    public IOException getLastFailedRpcReason() {
-      return lastFailedRpcReason;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
-    }
-  }
-
-  public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
-      User ticket, int rpcTimeout, int fanOutSize) {
-    this.rpcClient = rpcClient;
-    this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
-    Preconditions.checkArgument(this.addrs.size() >= 1);
-    // For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
-    // order, creating hot spots on the service end points.
-    Collections.shuffle(this.addrs);
-    this.ticket = ticket;
-    this.rpcTimeout = rpcTimeout;
-    // fanOutSize controls the number of hedged RPCs per batch.
-    this.fanOutSize = fanOutSize;
-  }
-
-  private HBaseRpcController applyRpcTimeout(RpcController controller) {
-    HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
-    int rpcTimeoutToSet =
-        hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
-    HBaseRpcController response = new HBaseRpcControllerImpl();
-    response.setCallTimeout(rpcTimeoutToSet);
-    return response;
-  }
-
-  private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
-      Message request, Message responsePrototype, RpcCallback<Message> done) {
-    int i = 0;
-    BatchRpcCtx lastBatchCtx = null;
-    while (i < addrs.size()) {
-      // Each iteration picks fanOutSize addresses to run as batch.
-      int batchEnd = Math.min(addrs.size(), i + fanOutSize);
-      List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
-      BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
-      lastBatchCtx = batchRpcCtx;
-      LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
-      for (InetSocketAddress address : addrSubList) {
-        HBaseRpcController rpcController = applyRpcTimeout(controller);
-        // ** WARN ** This is a blocking call if the underlying connection for the rpc client is
-        // a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
-        // the write calls. Handling blocking connection means that this should be run in a separate
-        // thread and hence more code complexity. Is it ok to handle only non-blocking connections?
-        batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
-            responsePrototype, ticket, address,
-            new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
-      }
-      if (batchRpcCtx.waitForResults()) {
-        return;
-      }
-      // Entire batch has failed, lets try the next batch.
-      LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
-      i = batchEnd;
-    }
-    Preconditions.checkNotNull(lastBatchCtx);
-    // All the batches failed, mark it a failed rpc.
-    // Propagate the failure reason. We propagate the last batch's last failing rpc reason.
-    // Can we do something better?
-    controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
-    done.run(null);
-  }
-
-  @Override
-  public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
-      Message request, Message responsePrototype, RpcCallback<Message> done) {
-    // There is no reason to use any other implementation of RpcController.
-    Preconditions.checkState(controller instanceof HBaseRpcController);
-    // To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
-    // called once the hedging finishes.
-    CompletableFuture.runAsync(
-      () -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
-  }
-}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index f7a65e4..4c85e3d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -18,20 +18,14 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
@@ -83,19 +77,6 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
   }
 
   @Override
-  public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
-      throws UnknownHostException {
-    final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
-        HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
-    Set<InetSocketAddress> addresses = new HashSet<>();
-    for (ServerName sn: sns) {
-      addresses.add(createAddr(sn));
-    }
-    return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
-        hedgedRpcFanOut);
-  }
-
-  @Override
   protected void closeInternal() {
     if (shutdownGroupWhenClose) {
       group.shutdownGracefully();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 558fcee..877d9b0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Set;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 
@@ -83,16 +83,6 @@ public interface RpcClient extends Closeable {
       throws IOException;
 
   /**
-   * Creates a channel that can hedge request to multiple underlying channels.
-   * @param sns  Set of servers for underlying channels.
-   * @param user user for the connection.
-   * @param rpcTimeout rpc timeout to use.
-   * @return A hedging rpc channel for this rpc client instance.
-   */
-  RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
-       throws IOException;
-
-  /**
    * Interrupt the connections to the given server. This should be called if the server
    * is known as actually dead. This will not prevent current operation to be retried, and,
    * depending on their own behavior, they may retry on the same server. This can be a feature,
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
new file mode 100644
index 0000000..8bbdce6
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestMasterRegistryHedgedReads {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
+
+  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  private static final ExecutorService EXECUTOR =
+    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
+
+  private static AtomicInteger CALLED = new AtomicInteger(0);
+
+  private static volatile int BAD_RESP_INDEX;
+
+  private static volatile Set<Integer> GOOD_RESP_INDEXS;
+
+  private static GetClusterIdResponse RESP =
+    GetClusterIdResponse.newBuilder().setClusterId("id").build();
+
+  public static final class RpcClientImpl implements RpcClient {
+
+    public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
+      MetricsConnection metrics) {
+    }
+
+    @Override
+    public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
+      throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
+      throws IOException {
+      return new RpcChannelImpl();
+    }
+
+    @Override
+    public void cancelConnections(ServerName sn) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean hasCellBlockSupport() {
+      return false;
+    }
+  }
+
+  public static final class RpcChannelImpl implements RpcChannel {
+
+    @Override
+    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+      Message responsePrototype, RpcCallback<Message> done) {
+      // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
+      EXECUTOR.execute(() -> {
+        int index = CALLED.getAndIncrement();
+        if (index == BAD_RESP_INDEX) {
+          done.run(GetClusterIdResponse.getDefaultInstance());
+        } else if (GOOD_RESP_INDEXS.contains(index)) {
+          done.run(RESP);
+        } else {
+          ((HBaseRpcController) controller).setFailed("inject error");
+          done.run(null);
+        }
+      });
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
+      RpcClient.class);
+    String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
+      .collect(Collectors.joining(","));
+    conf.set(HConstants.MASTER_ADDRS_KEY, masters);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    EXECUTOR.shutdownNow();
+  }
+
+  @Before
+  public void setUp() {
+    CALLED.set(0);
+    BAD_RESP_INDEX = -1;
+    GOOD_RESP_INDEXS = Collections.emptySet();
+  }
+
+  private <T> T logIfError(CompletableFuture<T> future) throws IOException {
+    try {
+      return FutureUtils.get(future);
+    } catch (Throwable t) {
+      LOG.warn("", t);
+      throw t;
+    }
+  }
+
+  @Test
+  public void testAllFailNoHedged() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
+      assertEquals(10, CALLED.get());
+    }
+  }
+
+  @Test
+  public void testAllFailHedged3() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
+    BAD_RESP_INDEX = 5;
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
+      assertEquals(10, CALLED.get());
+    }
+  }
+
+  @Test
+  public void testFirstSucceededNoHedge() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    // will be set to 1
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
+    GOOD_RESP_INDEXS =
+      IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      String clusterId = logIfError(registry.getClusterId());
+      assertEquals(RESP.getClusterId(), clusterId);
+      assertEquals(1, CALLED.get());
+    }
+  }
+
+  @Test
+  public void testSecondRoundSucceededHedge4() throws IOException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
+    GOOD_RESP_INDEXS = Collections.singleton(6);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      String clusterId = logIfError(registry.getClusterId());
+      assertEquals(RESP.getClusterId(), clusterId);
+      UTIL.waitFor(5000, () -> CALLED.get() == 8);
+    }
+  }
+
+  @Test
+  public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
+    GOOD_RESP_INDEXS = Collections.singleton(5);
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      String clusterId = logIfError(registry.getClusterId());
+      assertEquals(RESP.getClusterId(), clusterId);
+      UTIL.waitFor(5000, () -> CALLED.get() == 10);
+      Thread.sleep(1000);
+      // make sure we do not send more
+      assertEquals(10, CALLED.get());
+    }
+  }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6f1bb6a..b8098f4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -190,13 +190,6 @@ public final class HConstants {
   public static final String ZK_CONNECTION_REGISTRY_CLASS =
       "org.apache.hadoop.hbase.client.ZKConnectionRegistry";
 
-  /** Configuration to enable hedged reads on master registry **/
-  public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
-      "hbase.client.master_registry.enable_hedged_reads";
-
-  /** Default value for enabling hedging reads on master registry **/
-  public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
-
   /** Parameter name for the master type being backup (waits for primary to go inactive). */
   public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
 
@@ -939,12 +932,6 @@ public final class HConstants {
    */
   public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
 
-  /** Configuration key that controls the fan out of requests in hedged channel implementation. **/
-  public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
-
-  /** Default value for the fan out of hedged requests. **/
-  public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
-
   /**
    * timeout for each read RPC
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
index 6787a11..3c539a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -49,10 +53,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Base for TestFromClientSide* classes.
@@ -85,20 +87,20 @@ class FromClientSideBase {
    * to initialize from scratch. While this is a hack, it saves a ton of time for the full
    * test and de-flakes it.
    */
-  protected static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+  protected static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
     if (TEST_UTIL == null) {
       return false;
     }
     Configuration conf = TEST_UTIL.getConfiguration();
-    Class confClass = conf.getClass(
-        HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
-    int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
-        HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+    Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+      ZKConnectionRegistry.class);
+    int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+      MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
     return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
   }
 
-  protected static final void initialize(Class registryImpl, int numHedgedReqs, Class<?>... cps)
-      throws Exception {
+  protected static final void initialize(Class<?> registryImpl, int numHedgedReqs, Class<?>... cps)
+    throws Exception {
     // initialize() is called for every unit test, however we only want to reset the cluster state
     // at the end of every parameterized run.
     if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
@@ -122,13 +124,8 @@ class FromClientSideBase {
     conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests
     conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
         ConnectionRegistry.class);
-    if (numHedgedReqs == 1) {
-      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
-    } else {
-      Preconditions.checkArgument(numHedgedReqs > 1);
-      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
-    }
-    conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+    Preconditions.checkArgument(numHedgedReqs > 0);
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
     StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
     // Multiple masters needed only when hedged reads for master registry are enabled.
     builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index e0e7b71..aa8e10d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +46,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -157,13 +157,6 @@ public class TestClientTimeouts {
         throws UnknownHostException {
       return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
     }
-
-    @Override
-    public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
-        throws UnknownHostException {
-      Preconditions.checkArgument(sns != null && sns.size() == 1);
-      return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout);
-    }
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 4a83527..ac02e9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
index 65b2d0b..0e0060c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
 import static org.junit.Assert.assertEquals;
-import java.net.UnknownHostException;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -77,14 +78,15 @@ public class TestMasterRegistry {
   /**
    * Makes sure the master registry parses the master end points in the configuration correctly.
    */
-  @Test public void testMasterAddressParsing() throws UnknownHostException {
+  @Test
+  public void testMasterAddressParsing() throws IOException {
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     int numMasters = 10;
     conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
     try (MasterRegistry registry = new MasterRegistry(conf)) {
       List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
       // Half of them would be without a port, duplicates are removed.
-      assertEquals(numMasters/2 + 1, parsedMasters.size());
+      assertEquals(numMasters / 2 + 1, parsedMasters.size());
       // Sort in the increasing order of port numbers.
       Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
       for (int i = 0; i < parsedMasters.size(); i++) {
@@ -100,18 +102,14 @@ public class TestMasterRegistry {
     }
   }
 
-  @Test public void testRegistryRPCs() throws Exception {
+  @Test
+  public void testRegistryRPCs() throws Exception {
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
-    final int size = activeMaster.getMetaRegionLocationCache().
-      getMetaRegionLocations().get().size();
-    for (int numHedgedReqs = 1; numHedgedReqs <= 3; numHedgedReqs++) {
-      if (numHedgedReqs == 1) {
-        conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
-      } else {
-        conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
-      }
-      conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+    final int size =
+      activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
+    for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
+      conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
       try (MasterRegistry registry = new MasterRegistry(conf)) {
         // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
         // because not all replicas had made it up before test started.
@@ -119,9 +117,9 @@ public class TestMasterRegistry {
         assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
         assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
         List<HRegionLocation> metaLocations =
-            Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
-        List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
-            .getMetaRegionLocations().get();
+          Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
+        List<HRegionLocation> actualMetaLocations =
+          activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
         Collections.sort(metaLocations);
         Collections.sort(actualMetaLocations);
         assertEquals(actualMetaLocations, metaLocations);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 328bbbf..e10f342 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -72,6 +73,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
@@ -115,11 +117,11 @@ public class TestScannersFromClientSide {
   }
 
   @Parameterized.Parameters
-  public static Collection parameters() {
-    return Arrays.asList(new Object[][]{
-        {MasterRegistry.class, 1},
-        {MasterRegistry.class, 2},
-        {ZKConnectionRegistry.class, 1}
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][] {
+        { MasterRegistry.class, 1},
+        { MasterRegistry.class, 2},
+        { ZKConnectionRegistry.class, 1}
     });
   }
 
@@ -134,21 +136,21 @@ public class TestScannersFromClientSide {
    * to initialize from scratch. While this is a hack, it saves a ton of time for the full
    * test and de-flakes it.
    */
-  private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+  private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
     // initialize() is called for every unit test, however we only want to reset the cluster state
     // at the end of every parameterized run.
     if (TEST_UTIL == null) {
       return false;
     }
     Configuration conf = TEST_UTIL.getConfiguration();
-    Class confClass = conf.getClass(
-        HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
-    int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
-        HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+    Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+      ZKConnectionRegistry.class);
+    int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+      MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
     return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
   }
 
-  public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
+  public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
     if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
       return;
     }
@@ -161,13 +163,8 @@ public class TestScannersFromClientSide {
     conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
     conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
         ConnectionRegistry.class);
-    if (numHedgedReqs == 1) {
-      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
-    } else {
-      Preconditions.checkArgument(numHedgedReqs > 1);
-      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
-    }
-    conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+    Preconditions.checkArgument(numHedgedReqs > 0);
+    conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
     StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
     // Multiple masters needed only when hedged reads for master registry are enabled.
     builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index c3bf1c2..87561ba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -26,30 +26,27 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.StringUtils;
-import org.junit.Assume;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
-import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -367,105 +363,6 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  /**
-   * Tests the various request fan out values using a simple RPC hedged across a mix of running and
-   * failing servers.
-   */
-  @Test
-  @Ignore
-  public void testHedgedAsyncEcho() throws Exception {
-    // Hedging is not supported for blocking connection types.
-    Assume.assumeFalse(this instanceof TestBlockingIPC);
-    List<RpcServer> rpcServers = new ArrayList<>();
-    List<InetSocketAddress> addresses = new ArrayList<>();
-    // Create a mix of running and failing servers.
-    final int numRunningServers = 5;
-    final int numFailingServers = 3;
-    final int numServers = numRunningServers + numFailingServers;
-    for (int i = 0; i < numRunningServers; i++) {
-      RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
-          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-          new FifoRpcScheduler(CONF, 1));
-      rpcServer.start();
-      addresses.add(rpcServer.getListenerAddress());
-      rpcServers.add(rpcServer);
-    }
-    for (int i = 0; i < numFailingServers; i++) {
-      RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
-          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-          new FifoRpcScheduler(CONF, 1));
-      rpcServer.start();
-      addresses.add(rpcServer.getListenerAddress());
-      rpcServers.add(rpcServer);
-    }
-    Configuration conf = HBaseConfiguration.create();
-    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
-      // Try out various fan out values starting from 1 -> numServers.
-      for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
-        // Update the client's underlying conf, should be ok for the test.
-        LOG.debug("Testing with request fan out: " + reqFanOut);
-        conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
-        Interface stub = newStub(client, addresses);
-        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
-        stub.echo(new HBaseRpcControllerImpl(),
-            EchoRequestProto.newBuilder().setMessage("hello").build(), done);
-        TestProtos.EchoResponseProto responseProto = done.get();
-        assertNotNull(responseProto);
-        assertEquals("hello", responseProto.getMessage());
-        LOG.debug("Ended test with request fan out: " + reqFanOut);
-      }
-    } finally {
-      for (RpcServer rpcServer: rpcServers) {
-        rpcServer.stop();
-      }
-    }
-  }
-
-  @Test
-  public void testHedgedAsyncTimeouts() throws Exception {
-    // Hedging is not supported for blocking connection types.
-    Assume.assumeFalse(this instanceof TestBlockingIPC);
-    List<RpcServer> rpcServers = new ArrayList<>();
-    List<InetSocketAddress> addresses = new ArrayList<>();
-    final int numServers = 3;
-    for (int i = 0; i < numServers; i++) {
-      RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
-          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
-          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
-          new FifoRpcScheduler(CONF, 1));
-      rpcServer.start();
-      addresses.add(rpcServer.getListenerAddress());
-      rpcServers.add(rpcServer);
-    }
-    Configuration conf = HBaseConfiguration.create();
-    int timeout = 100;
-    int pauseTime = 1000;
-    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
-      // Try out various fan out values starting from 1 -> numServers.
-      for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
-        // Update the client's underlying conf, should be ok for the test.
-        LOG.debug("Testing with request fan out: " + reqFanOut);
-        conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
-        Interface stub = newStub(client, addresses);
-        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
-        pcrc.setCallTimeout(timeout);
-        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
-        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
-        assertNull(callback.get());
-        // Make sure the controller has the right exception propagated.
-        assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
-        LOG.debug("Ended test with request fan out: " + reqFanOut);
-      }
-    } finally {
-      for (RpcServer rpcServer: rpcServers) {
-        rpcServer.stop();
-      }
-    }
-  }
-
-
   @Test
   public void testAsyncRemoteError() throws IOException {
     AbstractRpcClient<?> client = createRpcClient(CONF);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
index 6adfa46..ab282e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -31,9 +29,11 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -67,17 +67,6 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
       User.getCurrent(), 0));
   }
 
-  public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
-      throws IOException {
-    Set<ServerName> serverNames = new HashSet<>();
-    for (InetSocketAddress addr: addrs) {
-      serverNames.add(ServerName.valueOf(
-          addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
-    }
-    return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
-        serverNames, User.getCurrent(), 0));
-  }
-
   @Override
   public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
       throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
index 27c1235..2881eb7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -24,15 +24,12 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
@@ -52,8 +48,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 
 @Category(MediumTests.class)
 public class TestRpcClientLeaks {
@@ -96,14 +90,6 @@ public class TestRpcClientLeaks {
       };
     }
 
-    // To keep the registry paths happy.
-    @Override
-    public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
-        throws UnknownHostException {
-      Preconditions.checkState(sns != null && sns.size() == 1);
-      return super.createRpcChannel((ServerName)sns.toArray()[0], user, rpcTimeout);
-    }
-
     public static void enableThrowExceptions() {
       throwException = true;
     }