You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2020/01/14 16:30:42 UTC

[hbase] 06/06: HBASE-23305: Master based registry implementation (#954)

This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch HBASE-18095/client-locate-meta-no-zookeeper
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d9bb034c9439d034f23dd0b66faafce9726e2fa5
Author: Bharath Vissapragada <bh...@apache.org>
AuthorDate: Tue Jan 14 08:24:07 2020 -0800

    HBASE-23305: Master based registry implementation (#954)
    
    Implements a master based registry for clients.
    
     - Supports hedged RPCs (fan out configured via configs).
     - Parameterized existing client tests to run with multiple registry combinations.
     - Added unit-test coverage for the new registry implementation.
    
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
    Signed-off-by: stack <st...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hbase/client/ConnectionRegistryFactory.java    |   4 +-
 .../apache/hadoop/hbase/client/MasterRegistry.java | 226 +++++++++++
 .../MasterRegistryFetchException.java}             |  32 +-
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  57 +--
 .../apache/hadoop/hbase/ipc/BlockingRpcClient.java |   7 +-
 .../apache/hadoop/hbase/ipc/HedgedRpcChannel.java  | 274 ++++++++++++++
 .../apache/hadoop/hbase/ipc/NettyRpcClient.java    |  34 +-
 .../org/apache/hadoop/hbase/ipc/RpcClient.java     |  19 +-
 .../hbase/client/TestConnectionRegistryLeak.java   |   3 +-
 .../java/org/apache/hadoop/hbase/HConstants.java   |  20 +-
 .../apache/hadoop/hbase/util/PrettyPrinter.java    |  23 +-
 .../org/apache/hadoop/hbase/TestTableName.java     |  16 +-
 .../apache/hadoop/hbase/util/JVMClusterUtil.java   |   7 +
 .../apache/hadoop/hbase/HBaseTestingUtility.java   |   3 +
 .../hbase/client/DummyConnectionRegistry.java      |   3 +-
 .../hadoop/hbase/client/TestFromClientSide.java    | 420 +++++++++++----------
 .../client/TestFromClientSideWithCoprocessor.java  |  23 +-
 .../hadoop/hbase/client/TestMasterRegistry.java    | 125 ++++++
 .../hbase/client/TestScannersFromClientSide.java   | 136 ++++---
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   | 120 +++++-
 .../hbase/ipc/TestProtobufRpcServiceImpl.java      |  25 +-
 21 files changed, 1243 insertions(+), 334 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
index 80d358b..9308443 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -27,9 +28,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 final class ConnectionRegistryFactory {
 
-  static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
-      "hbase.client.connection.registry.impl";
-
   private ConnectionRegistryFactory() {
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
new file mode 100644
index 0000000..5680847
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
+import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
+
+/**
+ * Master based registry implementation. Makes RPCs to the configured master addresses from config
+ * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
+ *
+ * It supports hedged reads, which can be enabled by setting
+ * {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
+ * out the requests batch is controlled by
+ * {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
+ *
+ * TODO: Handle changes to the configuration dynamically without having to restart the client.
+ */
+@InterfaceAudience.Private
+public class MasterRegistry implements ConnectionRegistry {
+  private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+
+  // Configured list of masters to probe the meta information from.
+  private final Set<ServerName> masterServers;
+
+  // RPC client used to talk to the masters.
+  private final RpcClient rpcClient;
+  private final RpcControllerFactory rpcControllerFactory;
+  private final int rpcTimeoutMs;
+
+  MasterRegistry(Configuration conf) {
+    boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
+        MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
+    Configuration finalConf;
+    if (!hedgedReadsEnabled) {
+      // If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
+      // the configuration so that other places reusing this reference is not affected.
+      finalConf = new Configuration(conf);
+      finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
+    } else {
+      finalConf = conf;
+    }
+    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    masterServers = new HashSet<>();
+    parseMasterAddrs(finalConf);
+    rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
+    rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
+  }
+
+  /**
+   * @return Stub needed to make RPC using a hedged channel to the master end points.
+   */
+  private ClientMetaService.Interface getMasterStub() throws IOException {
+    return ClientMetaService.newStub(
+        rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
+  }
+
+  /**
+   * Parses the list of master addresses from the provided configuration. Supported format is
+   * comma separated host[:port] values. If no port number if specified, default master port is
+   * assumed.
+   * @param conf Configuration to parse from.
+   */
+  private void parseMasterAddrs(Configuration conf) {
+    String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT);
+    for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+      HostAndPort masterHostPort =
+          HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+      masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
+    }
+    Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
+  }
+
+  @VisibleForTesting
+  public Set<ServerName> getParsedMasterServers() {
+    return Collections.unmodifiableSet(masterServers);
+  }
+
+  /**
+   * Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
+   * the rpc finishes and the response is propagated to the passed future.
+   * @param future Result future to which the rpc response is propagated.
+   * @param isValidResp Checks if the rpc response has a valid result.
+   * @param transformResult Transforms the result to a different form as expected by callers.
+   * @param hrc RpcController instance for this rpc.
+   * @param debug Debug message passed along to the caller in case of exceptions.
+   * @param <T> RPC result type.
+   * @param <R> Transformed type of the result.
+   * @return A call back that can be embedded in the non-blocking rpc call.
+   */
+  private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
+      Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
+      final String debug) {
+    return rpcResult -> {
+      if (rpcResult == null) {
+        future.completeExceptionally(
+            new MasterRegistryFetchException(masterServers, hrc.getFailed()));
+      }
+      if (!isValidResp.test(rpcResult)) {
+        // Rpc returned ok, but result was malformed.
+        future.completeExceptionally(new IOException(
+            String.format("Invalid result for request %s. Will be retried", debug)));
+
+      }
+      future.complete(transformResult.apply(rpcResult));
+    };
+  }
+
+  /**
+   * Simple helper to transform the result of getMetaRegionLocations() rpc.
+   */
+  private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
+    List<HRegionLocation> regionLocations = new ArrayList<>();
+    resp.getMetaLocationsList().forEach(
+      location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
+    return new RegionLocations(regionLocations);
+  }
+
+  @Override
+  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
+    CompletableFuture<RegionLocations> result = new CompletableFuture<>();
+    HBaseRpcController hrc = rpcControllerFactory.newController();
+    RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
+      (rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
+        "getMetaRegionLocations()");
+    try {
+      getMasterStub().getMetaRegionLocations(
+          hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
+    } catch (IOException e) {
+      result.completeExceptionally(e);
+    }
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<String> getClusterId() {
+    CompletableFuture<String> result = new CompletableFuture<>();
+    HBaseRpcController hrc = rpcControllerFactory.newController();
+    RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
+        GetClusterIdResponse::hasClusterId,  GetClusterIdResponse::getClusterId, hrc,
+        "getClusterId()");
+    try {
+      getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
+    } catch (IOException e) {
+      result.completeExceptionally(e);
+    }
+    return result;
+  }
+
+  private ServerName transformServerName(GetActiveMasterResponse resp) {
+    return ProtobufUtil.toServerName(resp.getServerName());
+  }
+
+  @Override
+  public CompletableFuture<ServerName> getActiveMaster() {
+    CompletableFuture<ServerName> result = new CompletableFuture<>();
+    HBaseRpcController hrc = rpcControllerFactory.newController();
+    RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
+        GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
+        "getActiveMaster()");
+    try {
+      getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
+    } catch (IOException e) {
+      result.completeExceptionally(e);
+    }
+    return result;
+  }
+
+  @Override
+  public void close() {
+    if (rpcClient != null) {
+      rpcClient.close();
+    }
+  }
+}
\ No newline at end of file
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
similarity index 53%
copy from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
copy to hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
index 80d358b..18871be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java
@@ -15,31 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.client;
+package org.apache.hadoop.hbase.exceptions;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
+import java.util.Set;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.PrettyPrinter;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Factory class to get the instance of configured connection registry.
+ * Exception thrown when an master registry RPC fails in client. The exception includes the list of
+ * masters to which RPC was attempted and the last exception encountered. Prior exceptions are
+ * included in the logs.
  */
 @InterfaceAudience.Private
-final class ConnectionRegistryFactory {
-
-  static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
-      "hbase.client.connection.registry.impl";
-
-  private ConnectionRegistryFactory() {
-  }
-
-  /**
-   * @return The connection registry implementation to use.
-   */
-  static ConnectionRegistry getRegistry(Configuration conf) {
-    Class<? extends ConnectionRegistry> clazz = conf.getClass(
-        CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
-        ConnectionRegistry.class);
-    return ReflectionUtils.newInstance(clazz, conf);
+public class MasterRegistryFetchException extends HBaseIOException {
+  public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
+    super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
+        failure);
   }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 629efe6..72b9f83 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -20,22 +20,6 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -43,18 +27,15 @@ import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
@@ -69,7 +50,22 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
-
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 
 /**
@@ -106,7 +102,8 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
       justification="the rest of the system which live in the different package can use")
-  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
+  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS =
+      new HashMap<>();
 
   static {
     TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
@@ -217,7 +214,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
         // have some pending calls on connection so we should not shutdown the connection outside.
         // The connection itself will disconnect if there is no pending call for maxIdleTime.
         if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
-          if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
+          }
           connections.removeValue(conn.remoteId(), conn);
           conn.cleanupConnection();
         }
@@ -398,7 +397,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
     }
   }
 
-  private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+  Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
       final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
       final RpcCallback<Message> callback) {
     final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
@@ -435,9 +434,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
     } catch (Exception e) {
       call.setException(toIOE(e));
     }
+    return call;
   }
 
-  private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
+  InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
     InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
     if (addr.isUnresolved()) {
       throw new UnknownHostException("can not resolve " + sn.getServerName());
@@ -527,6 +527,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
     return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
   }
 
+  @Override
+  public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
+      throws UnknownHostException {
+    // Check HedgedRpcChannel implementation for detailed comments.
+    throw new UnsupportedOperationException("Hedging not supported for this implementation.");
+  }
+
   private static class AbstractRpcChannel {
 
     protected final InetSocketAddress addr;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
index f84c308..22eca53 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -17,18 +17,15 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import java.io.IOException;
 import java.net.SocketAddress;
-
 import javax.net.SocketFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Does RPC against a cluster. Manages connections per regionserver in the cluster.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
new file mode 100644
index 0000000..7b681e0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.PrettyPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+
+/**
+ * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
+ * First received response is returned to the caller. This abstracts out the logic needed to batch
+ * requests to multiple end points underneath and presents itself as a single logical RpcChannel to
+ * the client.
+ *
+ * Hedging Details:
+ * ---------------
+ * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
+ * end points to make the call to. We do multiple iterations until we get a proper response to the
+ * rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
+ * configurable and is also known as 'fanOutSize'.
+ *
+ * - We randomize the addresses up front so that the batch order per client is non deterministic.
+ * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
+ * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
+ * load on the client and server side when configuring the fan out.
+ *
+ * - In a happy case, once we receive a response from one end point, we cancel all the
+ * other inflight rpcs in the same batch and return the response to the caller. If we do not get a
+ * valid response from any address end point, we propagate the error back to the caller.
+ *
+ * - Rpc timeouts are applied to every hedged rpc.
+ *
+ * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
+ * be hedged (for example: cluster state changing rpcs).
+ *
+ * (TODO) Retries and Adaptive hedging policy:
+ * ------------------------------------------
+ *
+ * - No retries are handled at the channel level. Retries can be built in upper layers. However the
+ * question is, do we even need retries? Hedging in fact is a substitute for retries.
+ *
+ * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
+ * policy more adaptive. In most happy cases, the rpcs from the first few end points should return
+ * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
+ * is not needed. So, the idea is to make this request pattern pluggable so that the requests are
+ * hedged only when needed.
+ */
+class HedgedRpcChannel implements RpcChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
+
+  /**
+   * Currently hedging is only supported for non-blocking connection implementation types because
+   * the channel implementation inherently relies on the connection implementation being async.
+   * Refer to the comments in doCallMethod().
+   */
+  private final NettyRpcClient rpcClient;
+  // List of service addresses to hedge the requests to.
+  private final List<InetSocketAddress> addrs;
+  private final User ticket;
+  private final int rpcTimeout;
+  // Controls the size of request fan out (number of rpcs per a single batch).
+  private final int fanOutSize;
+
+  /**
+   * A simple rpc call back implementation to notify the batch context if any rpc is successful.
+   */
+  private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
+    private  final BatchRpcCtx batchRpcCtx;
+    private final HBaseRpcController rpcController;
+    BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
+      this.batchRpcCtx = batchRpcCtx;
+      this.rpcController = rpcController;
+    }
+    @Override
+    public void run(Message result) {
+      batchRpcCtx.setResultIfNotSet(result, rpcController);
+    }
+  }
+
+  /**
+   * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
+   * synchronize on multiple RPCs to different end points fetching the result. All the methods are
+   * thread-safe.
+   */
+  private static class BatchRpcCtx {
+    // Result set by the thread finishing first. Set only once.
+    private final AtomicReference<Message> result = new AtomicReference<>();
+    // Caller waits on this latch being set.
+    // We set this to 1, so that the first successful RPC result is returned to the client.
+    private CountDownLatch resultsReady = new CountDownLatch(1);
+    // Failed rpc book-keeping.
+    private AtomicInteger failedRpcCount = new AtomicInteger();
+    // All the call handles for this batch.
+    private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
+
+    // Target addresses.
+    private final List<InetSocketAddress> addresses;
+    // Called when the result is ready.
+    private final RpcCallback<Message> callBack;
+    // Last failed rpc's exception. Used to propagate the reason to the controller.
+    private IOException lastFailedRpcReason;
+
+
+    BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
+      this.addresses = addresses;
+      this.callBack = Preconditions.checkNotNull(callBack);
+    }
+
+    /**
+     * Sets the result only if it is not already set by another thread. Thread that successfully
+     * sets the result also count downs the latch.
+     * @param result Result to be set.
+     */
+    public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
+      if (rpcController.failed()) {
+        incrementFailedRpcs(rpcController.getFailed());
+        return;
+      }
+      if (this.result.compareAndSet(null, result)) {
+        resultsReady.countDown();
+        // Cancel all pending in flight calls.
+        for (Call call: callsInFlight) {
+          // It is ok to do it for all calls as it is a no-op if the call is already done.
+          final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
+              "for the same rpc already succeeded. This is not needed anymore.", call);
+          call.setException(new CallCancelledException(exceptionMsg));
+        }
+      }
+    }
+
+    /**
+     * Waits until the results are populated and calls the callback if the call is successful.
+     * @return true for successful rpc and false otherwise.
+     */
+    public boolean waitForResults() {
+      try {
+        // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
+        // something on the remote is broken. Worst case we should wait for rpc time out to kick in.
+        resultsReady.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
+      }
+      Message message = result.get();
+      if (message != null) {
+        callBack.run(message);
+        return true;
+      }
+      return false;
+    }
+
+    public void addCallInFlight(Call c) {
+      callsInFlight.add(c);
+    }
+
+    public void incrementFailedRpcs(IOException reason) {
+      if (failedRpcCount.incrementAndGet() == addresses.size()) {
+        lastFailedRpcReason = reason;
+        // All the rpcs in this batch have failed. Invoke the waiting threads.
+        resultsReady.countDown();
+      }
+    }
+
+    public IOException getLastFailedRpcReason() {
+      return lastFailedRpcReason;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
+    }
+  }
+
+  public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
+      User ticket, int rpcTimeout, int fanOutSize) {
+    this.rpcClient = rpcClient;
+    this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
+    Preconditions.checkArgument(this.addrs.size() >= 1);
+    // For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
+    // order, creating hot spots on the service end points.
+    Collections.shuffle(this.addrs);
+    this.ticket = ticket;
+    this.rpcTimeout = rpcTimeout;
+    // fanOutSize controls the number of hedged RPCs per batch.
+    this.fanOutSize = fanOutSize;
+  }
+
+  private HBaseRpcController applyRpcTimeout(RpcController controller) {
+    HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
+    int rpcTimeoutToSet =
+        hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
+    HBaseRpcController response = new HBaseRpcControllerImpl();
+    response.setCallTimeout(rpcTimeoutToSet);
+    return response;
+  }
+
+  private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
+      Message request, Message responsePrototype, RpcCallback<Message> done) {
+    int i = 0;
+    BatchRpcCtx lastBatchCtx = null;
+    while (i < addrs.size()) {
+      // Each iteration picks fanOutSize addresses to run as batch.
+      int batchEnd = Math.min(addrs.size(), i + fanOutSize);
+      List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
+      BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
+      lastBatchCtx = batchRpcCtx;
+      LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
+      for (InetSocketAddress address : addrSubList) {
+        HBaseRpcController rpcController = applyRpcTimeout(controller);
+        // ** WARN ** This is a blocking call if the underlying connection for the rpc client is
+        // a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
+        // the write calls. Handling blocking connection means that this should be run in a separate
+        // thread and hence more code complexity. Is it ok to handle only non-blocking connections?
+        batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
+            responsePrototype, ticket, address,
+            new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
+      }
+      if (batchRpcCtx.waitForResults()) {
+        return;
+      }
+      // Entire batch has failed, lets try the next batch.
+      LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
+      i = batchEnd;
+    }
+    Preconditions.checkNotNull(lastBatchCtx);
+    // All the batches failed, mark it a failed rpc.
+    // Propagate the failure reason. We propagate the last batch's last failing rpc reason.
+    // Can we do something better?
+    controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
+    done.run(null);
+  }
+
+  @Override
+  public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
+      Message request, Message responsePrototype, RpcCallback<Message> done) {
+    // There is no reason to use any other implementation of RpcController.
+    Preconditions.checkState(controller instanceof HBaseRpcController);
+    // To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
+    // called once the hedging finishes.
+    CompletableFuture.runAsync(
+      () -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index 61dedbb..c4f70b0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -17,21 +17,26 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
-import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
  * Netty client for the requests and responses.
@@ -75,6 +80,19 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
   }
 
   @Override
+  public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
+      throws UnknownHostException {
+    final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
+        HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+    Set<InetSocketAddress> addresses = new HashSet<>();
+    for (ServerName sn: sns) {
+      addresses.add(createAddr(sn));
+    }
+    return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
+        hedgedRpcFanOut);
+  }
+
+  @Override
   protected void closeInternal() {
     if (shutdownGroupWhenClose) {
       group.shutdownGracefully();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 0e00695..558fcee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -17,15 +17,14 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
-
 import java.io.Closeable;
 import java.io.IOException;
-
+import java.util.Set;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
 
 /**
  * Interface for RpcClient implementations so ConnectionManager can handle it.
@@ -84,6 +83,16 @@ public interface RpcClient extends Closeable {
       throws IOException;
 
   /**
+   * Creates a channel that can hedge request to multiple underlying channels.
+   * @param sns  Set of servers for underlying channels.
+   * @param user user for the connection.
+   * @param rpcTimeout rpc timeout to use.
+   * @return A hedging rpc channel for this rpc client instance.
+   */
+  RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
+       throws IOException;
+
+  /**
    * Interrupt the connections to the given server. This should be called if the server
    * is known as actually dead. This will not prevent current operation to be retried, and,
    * depending on their own behavior, they may retry on the same server. This can be a feature,
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java
index f02ec42..561b1f5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.FutureUtils;
@@ -70,7 +71,7 @@ public class TestConnectionRegistryLeak {
 
   @BeforeClass
   public static void setUp() {
-    CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+    CONF.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
         ConnectionRegistryForTest.class, ConnectionRegistry.class);
   }
 
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 132d3e0..ef525d7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -180,10 +180,17 @@ public final class HConstants {
   public static final String MASTER_INFO_PORT = "hbase.master.info.port";
 
   /** Configuration key for the list of master host:ports **/
-  public static final String MASTER_ADDRS_KEY = "hbase.master.addrs";
+  public static final String MASTER_ADDRS_KEY = "hbase.masters";
 
   public static final String MASTER_ADDRS_DEFAULT =  "localhost:" + DEFAULT_MASTER_PORT;
 
+  /** Configuration to enable hedged reads on master registry **/
+  public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
+      "hbase.client.master_registry.enable_hedged_reads";
+
+  /** Default value for enabling hedging reads on master registry **/
+  public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
+
   /** Parameter name for the master type being backup (waits for primary to go inactive). */
   public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
 
@@ -909,6 +916,12 @@ public final class HConstants {
    */
   public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
 
+  /** Configuration key that controls the fan out of requests in hedged channel implementation. **/
+  public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
+
+  /** Default value for the fan out of hedged requests. **/
+  public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
+
   /**
    * timeout for each read RPC
    */
@@ -940,6 +953,11 @@ public final class HConstants {
    */
   public static final long NO_SEQNUM = -1;
 
+  /**
+   * Registry implementation to be used on the client side.
+   */
+  public static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
+      "hbase.client.registry.impl";
 
   /*
    * cluster replication constants.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
index 147e916..ff7064b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java
@@ -19,9 +19,12 @@
 
 package org.apache.hadoop.hbase.util;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.exceptions.HBaseException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -29,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
-public class PrettyPrinter {
+public final class PrettyPrinter {
 
   private static final Logger LOG = LoggerFactory.getLogger(PrettyPrinter.class);
 
@@ -117,7 +120,7 @@ public class PrettyPrinter {
       sb.append(" DAY").append(days == 1 ? "" : "S");
     }
 
-    if (hours > 0 ) {
+    if (hours > 0) {
       sb.append(days > 0 ? " " : "");
       sb.append(hours);
       sb.append(" HOUR").append(hours == 1 ? "" : "S");
@@ -188,4 +191,18 @@ public class PrettyPrinter {
     return ttl;
   }
 
+  /**
+   * Pretty prints a collection of any type to a string. Relies on toString() implementation of the
+   * object type.
+   * @param collection collection to pretty print.
+   * @return Pretty printed string for the collection.
+   */
+  public static String toString(Collection<?> collection) {
+    List<String> stringList = new ArrayList<>();
+    for (Object o: collection) {
+      stringList.add(Objects.toString(o));
+    }
+    return "[" + String.join(",", stringList) + "]";
+  }
+
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java
index 43a384a..7e44399 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java
@@ -51,7 +51,21 @@ public class TestTableName extends TestWatcher {
    */
   @Override
   protected void starting(Description description) {
-    tableName = TableName.valueOf(description.getMethodName());
+    tableName = TableName.valueOf(cleanUpTestName(description.getMethodName()));
+  }
+
+  /**
+   * Helper to handle parameterized method names. Unlike regular test methods, parameterized method
+   * names look like 'foo[x]'. This is problematic for tests that use this name for HBase tables.
+   * This helper strips out the parameter suffixes.
+   * @return current test method name with out parameterized suffixes.
+   */
+  private static String cleanUpTestName(String methodName) {
+    int index = methodName.indexOf('[');
+    if (index == -1) {
+      return methodName;
+    }
+    return methodName.substring(0, index);
   }
 
   public TableName getTableName() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
index 5c6ad95..cfa6f75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
@@ -26,12 +26,14 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
  * Utility used running a cluster all in the one JVM.
@@ -136,6 +138,11 @@ public class JVMClusterUtil {
     } catch (Exception e) {
       throw new IOException(e);
     }
+    // Needed if a master based registry is configured for internal cluster connections. Here, we
+    // just add the current master host port since we do not know other master addresses up front
+    // in mini cluster tests.
+    c.set(HConstants.MASTER_ADDRS_KEY,
+        Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
     return new JVMClusterUtil.MasterThread(server, index);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index f1e91de..d1f2f1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1128,6 +1128,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     this.hbaseCluster =
         new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
             option.getRsPorts(), option.getMasterClass(), option.getRsClass());
+    // Populate the master address configuration from mini cluster configuration.
+    conf.set(HConstants.MASTER_ADDRS_KEY,
+        c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
     // Don't leave here till we've done a successful scan of the hbase:meta
     Table t = getConnection().getTable(TableName.META_TABLE_NAME);
     ResultScanner s = t.getScanner(new Scan());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
index a669362..c9d67f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 
@@ -28,7 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
 public class DummyConnectionRegistry implements ConnectionRegistry {
 
   public static final String REGISTRY_IMPL_CONF_KEY =
-      ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+      HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
 
   @Override
   public CompletableFuture<RegionLocations> getMetaRegionLocations() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 61d78ce..4f4fe79 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,10 +28,10 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -64,7 +64,9 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TestTableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -106,23 +108,28 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.NonRepeatedEnvironmentEdge;
 import org.apache.hadoop.hbase.util.TableDescriptorChecker;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Assume;
 import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
  * Run tests that use the HBase clients; {@link Table}.
  * Sets up the HBase mini cluster once at start and runs through all client tests.
  * Each creates a table named for the method and does its stuff against that.
+ *
+ * Parameterized to run with different registry implementations.
  */
 @Category({LargeTests.class, ClientTests.class})
 @SuppressWarnings ("deprecation")
+@RunWith(Parameterized.class)
 public class TestFromClientSide {
 
   @ClassRule
@@ -131,7 +138,7 @@ public class TestFromClientSide {
 
   // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide.
   private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class);
-  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected static HBaseTestingUtility TEST_UTIL;
   private static byte [] ROW = Bytes.toBytes("testRow");
   private static byte [] FAMILY = Bytes.toBytes("testFamily");
   private static final byte[] INVALID_FAMILY = Bytes.toBytes("invalidTestFamily");
@@ -139,10 +146,54 @@ public class TestFromClientSide {
   private static byte [] VALUE = Bytes.toBytes("testValue");
   protected static int SLAVES = 3;
 
-  @Rule
-  public TestName name = new TestName();
+  @Rule public TestTableName name = new TestTableName();
+
+  // To keep the child classes happy.
+  TestFromClientSide() {}
+
+  public TestFromClientSide(Class registry, int numHedgedReqs) throws Exception {
+    initialize(registry, numHedgedReqs, MultiRowMutationEndpoint.class);
+  }
+
+  @Parameterized.Parameters
+  public static Collection parameters() {
+    return Arrays.asList(new Object[][] {
+        { MasterRegistry.class, 1},
+        { MasterRegistry.class, 2},
+        { ZKConnectionRegistry.class, 1}
+    });
+  }
+
+  /**
+   * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
+   * there is no easy way to restart the test cluster after each parameterized run. Annotation
+   * BeforeParam does not work either because it runs before parameterization and hence does not
+   * have access to the test parameters (which is weird).
+   *
+   * This *hack* checks if the current instance of test cluster configuration has the passed
+   * parameterized configs. In such a case, we can just reuse the cluster for test and do not need
+   * to initialize from scratch. While this is a hack, it saves a ton of time for the full
+   * test and de-flakes it.
+   */
+  private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+    if (TEST_UTIL == null) {
+      return false;
+    }
+    Configuration conf = TEST_UTIL.getConfiguration();
+    Class confClass = conf.getClass(
+        HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
+    int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
+        HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+    return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
+  }
 
-  protected static final void initialize(Class<?>... cps) throws Exception {
+  protected static final void initialize(Class registryImpl, int numHedgedReqs, Class<?>... cps)
+      throws Exception {
+    // initialize() is called for every unit test, however we only want to reset the cluster state
+    // at the end of every parameterized run.
+    if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
+      return;
+    }
     // Uncomment the following lines if more verbosity is needed for
     // debugging (see HBASE-12285 for details).
     // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -150,22 +201,35 @@ public class TestFromClientSide {
     // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
     // make sure that we do not get the same ts twice, see HBASE-19731 for more details.
     EnvironmentEdgeManager.injectEdge(new NonRepeatedEnvironmentEdge());
+    if (TEST_UTIL != null) {
+      // We reached end of a parameterized run, clean up.
+      TEST_UTIL.shutdownMiniCluster();
+    }
+    TEST_UTIL = new HBaseTestingUtility();
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
       Arrays.stream(cps).map(Class::getName).toArray(String[]::new));
     conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests
-    // We need more than one region server in this test
-    TEST_UTIL.startMiniCluster(SLAVES);
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    initialize(MultiRowMutationEndpoint.class);
+    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
+        ConnectionRegistry.class);
+    if (numHedgedReqs == 1) {
+      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+    } else {
+      Preconditions.checkArgument(numHedgedReqs > 1);
+      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+    }
+    conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+    StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
+    // Multiple masters needed only when hedged reads for master registry are enabled.
+    builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES);
+    TEST_UTIL.startMiniCluster(builder.build());
   }
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
+    if (TEST_UTIL != null) {
+      TEST_UTIL.shutdownMiniCluster();
+    }
   }
 
   /**
@@ -173,7 +237,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testDuplicateAppend() throws Exception {
-    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getMethodName());
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getTableName());
     Map<String, String> kvs = new HashMap<>();
     kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
     hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
@@ -181,11 +245,11 @@ public class TestFromClientSide {
 
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
-    // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
+    // Client will retry because rpc timeout is small than the sleep time of first rpc call
     c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
 
     try (Connection connection = ConnectionFactory.createConnection(c);
-        Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null)
+        Table table = connection.getTableBuilder(name.getTableName(), null)
           .setOperationTimeout(3 * 1000).build()) {
       Append append = new Append(ROW);
       append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE);
@@ -209,7 +273,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testKeepDeletedCells() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     final byte[] FAMILY = Bytes.toBytes("family");
     final byte[] C0 = Bytes.toBytes("c0");
 
@@ -275,7 +339,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testPurgeFutureDeletes() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     final byte[] ROW = Bytes.toBytes("row");
     final byte[] FAMILY = Bytes.toBytes("family");
     final byte[] COLUMN = Bytes.toBytes("column");
@@ -328,7 +392,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testGetConfiguration() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
     Configuration conf = TEST_UTIL.getConfiguration();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILIES)) {
@@ -342,7 +406,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testWeirdCacheBehaviour() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"),
         Bytes.toBytes("trans-type"), Bytes.toBytes("trans-date"),
         Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
@@ -383,7 +447,7 @@ public class TestFromClientSide {
   }
 
   private void deleteColumns(Table ht, String value, String keyPrefix)
-  throws IOException {
+      throws IOException {
     ResultScanner scanner = buildScanner(keyPrefix, value, ht);
     Iterator<Result> it = scanner.iterator();
     int count = 0;
@@ -468,8 +532,8 @@ public class TestFromClientSide {
    */
   @Test
   public void testFilterAcrossMultipleRegions()
-  throws IOException, InterruptedException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+      throws IOException, InterruptedException {
+    final TableName tableName = name.getTableName();
     try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
       int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
       assertRowCount(t, rowCount);
@@ -555,8 +619,7 @@ public class TestFromClientSide {
    * @param t Table to split.
    * @return Map of regions to servers.
    */
-  private List<HRegionLocation> splitTable(final Table t)
-  throws IOException, InterruptedException {
+  private List<HRegionLocation> splitTable(final Table t) throws IOException {
     // Split this table in two.
     Admin admin = TEST_UTIL.getAdmin();
     admin.split(t.getName());
@@ -572,8 +635,7 @@ public class TestFromClientSide {
    * @param t
    * @return Map of table regions; caller needs to check table actually split.
    */
-  private List<HRegionLocation> waitOnSplit(final Table t)
-  throws IOException {
+  private List<HRegionLocation> waitOnSplit(final Table t) throws IOException {
     try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(t.getName())) {
       List<HRegionLocation> regions = locator.getAllRegionLocations();
       int originalCount = regions.size();
@@ -585,8 +647,9 @@ public class TestFromClientSide {
           e.printStackTrace();
         }
         regions = locator.getAllRegionLocations();
-        if (regions.size() > originalCount)
+        if (regions.size() > originalCount) {
           break;
+        }
       }
       return regions;
     }
@@ -594,7 +657,7 @@ public class TestFromClientSide {
 
   @Test
   public void testSuperSimple() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, VALUE);
@@ -610,7 +673,7 @@ public class TestFromClientSide {
 
   @Test
   public void testMaxKeyValueSize() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     Configuration conf = TEST_UTIL.getConfiguration();
     String oldMaxSize = conf.get(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY);
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -640,7 +703,7 @@ public class TestFromClientSide {
 
   @Test
   public void testFilters() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 10);
       byte[][] QUALIFIERS = {
@@ -677,7 +740,7 @@ public class TestFromClientSide {
 
   @Test
   public void testFilterWithLongCompartor() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 10);
       byte[][] values = new byte[10][];
@@ -709,7 +772,7 @@ public class TestFromClientSide {
 
   @Test
   public void testKeyOnlyFilter() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 10);
       byte[][] QUALIFIERS = {
@@ -747,7 +810,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testSimpleMissing() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 4);
 
@@ -858,7 +921,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testSingleRowMultipleFamily() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] ROWS = makeN(ROW, 3);
     byte [][] FAMILIES = makeNAscii(FAMILY, 10);
     byte [][] QUALIFIERS = makeN(QUALIFIER, 10);
@@ -1166,7 +1229,7 @@ public class TestFromClientSide {
 
   @Test(expected = IllegalArgumentException.class)
   public void testNullFamilyName() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     // Null family (should NOT work)
     TEST_UTIL.createTable(tableName, new byte[][]{null});
@@ -1175,7 +1238,7 @@ public class TestFromClientSide {
 
   @Test
   public void testNullRowAndQualifier() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
 
@@ -1211,7 +1274,7 @@ public class TestFromClientSide {
 
   @Test
   public void testNullEmptyQualifier() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
 
@@ -1249,7 +1312,7 @@ public class TestFromClientSide {
 
   @Test
   public void testNullValue() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       // Null value
@@ -1284,7 +1347,7 @@ public class TestFromClientSide {
 
   @Test
   public void testNullQualifier() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
 
       // Work for Put
@@ -1342,7 +1405,7 @@ public class TestFromClientSide {
 
   @Test
   public void testVersions() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     long [] STAMPS = makeStamps(20);
     byte [][] VALUES = makeNAscii(VALUE, 20);
@@ -1569,7 +1632,7 @@ public class TestFromClientSide {
   @Test
   @SuppressWarnings("checkstyle:MethodLength")
   public void testVersionLimits() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] FAMILIES = makeNAscii(FAMILY, 3);
     int [] LIMITS = {1,3,5};
     long [] STAMPS = makeStamps(10);
@@ -1764,7 +1827,7 @@ public class TestFromClientSide {
   @Test
   public void testDeleteFamilyVersion() throws Exception {
     try (Admin admin = TEST_UTIL.getAdmin()) {
-      final TableName tableName = TableName.valueOf(name.getMethodName());
+      final TableName tableName = name.getTableName();
 
       byte[][] QUALIFIERS = makeNAscii(QUALIFIER, 1);
       byte[][] VALUES = makeN(VALUE, 5);
@@ -1804,7 +1867,7 @@ public class TestFromClientSide {
 
   @Test
   public void testDeleteFamilyVersionWithOtherDeletes() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 5);
     byte [][] VALUES = makeN(VALUE, 5);
@@ -1922,7 +1985,7 @@ public class TestFromClientSide {
 
   @Test
   public void testDeleteWithFailed() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     byte [][] FAMILIES = makeNAscii(FAMILY, 3);
     byte [][] VALUES = makeN(VALUE, 5);
@@ -1948,7 +2011,7 @@ public class TestFromClientSide {
 
   @Test
   public void testDeletes() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     byte [][] ROWS = makeNAscii(ROW, 6);
     byte [][] FAMILIES = makeNAscii(FAMILY, 3);
@@ -2254,7 +2317,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testBatchOperationsWithErrors() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table foo = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 10)) {
 
       int NUM_OPS = 100;
@@ -2380,7 +2443,7 @@ public class TestFromClientSide {
     int numRows = 10;
     int numColsPerRow = 2000;
 
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     byte [][] ROWS = makeN(ROW, numRows);
     byte [][] QUALIFIERS = makeN(QUALIFIER, numColsPerRow);
@@ -2463,7 +2526,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testJiraTest861() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] VALUES = makeNAscii(VALUE, 7);
     long [] STAMPS = makeStamps(7);
 
@@ -2526,7 +2589,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testJiraTest33() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] VALUES = makeNAscii(VALUE, 7);
     long [] STAMPS = makeStamps(7);
 
@@ -2574,7 +2637,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testJiraTest1014() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
 
@@ -2598,7 +2661,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testJiraTest1182() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] VALUES = makeNAscii(VALUE, 7);
     long [] STAMPS = makeStamps(7);
 
@@ -2642,7 +2705,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testJiraTest52() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] VALUES = makeNAscii(VALUE, 7);
     long [] STAMPS = makeStamps(7);
 
@@ -2678,8 +2741,7 @@ public class TestFromClientSide {
 
   private void getVersionRangeAndVerifyGreaterThan(Table ht, byte [] row,
       byte [] family, byte [] qualifier, long [] stamps, byte [][] values,
-      int start, int end)
-  throws IOException {
+      int start, int end) throws IOException {
     Get get = new Get(row);
     get.addColumn(family, qualifier);
     get.readVersions(Integer.MAX_VALUE);
@@ -2689,8 +2751,7 @@ public class TestFromClientSide {
   }
 
   private void getVersionRangeAndVerify(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
-  throws IOException {
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
     Get get = new Get(row);
     get.addColumn(family, qualifier);
     get.readVersions(Integer.MAX_VALUE);
@@ -2700,8 +2761,7 @@ public class TestFromClientSide {
   }
 
   private void getAllVersionsAndVerify(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
-  throws IOException {
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
     Get get = new Get(row);
     get.addColumn(family, qualifier);
     get.readVersions(Integer.MAX_VALUE);
@@ -2711,8 +2771,7 @@ public class TestFromClientSide {
 
   private void scanVersionRangeAndVerifyGreaterThan(Table ht, byte [] row,
       byte [] family, byte [] qualifier, long [] stamps, byte [][] values,
-      int start, int end)
-  throws IOException {
+      int start, int end) throws IOException {
     Scan scan = new Scan(row);
     scan.addColumn(family, qualifier);
     scan.setMaxVersions(Integer.MAX_VALUE);
@@ -2722,8 +2781,7 @@ public class TestFromClientSide {
   }
 
   private void scanVersionRangeAndVerify(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
-  throws IOException {
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
     Scan scan = new Scan(row);
     scan.addColumn(family, qualifier);
     scan.setMaxVersions(Integer.MAX_VALUE);
@@ -2733,8 +2791,7 @@ public class TestFromClientSide {
   }
 
   private void scanAllVersionsAndVerify(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long [] stamps, byte [][] values, int start, int end)
-  throws IOException {
+      byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException {
     Scan scan = new Scan(row);
     scan.addColumn(family, qualifier);
     scan.setMaxVersions(Integer.MAX_VALUE);
@@ -2743,8 +2800,7 @@ public class TestFromClientSide {
   }
 
   private void getVersionAndVerify(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long stamp, byte [] value)
-  throws Exception {
+      byte [] qualifier, long stamp, byte [] value) throws Exception {
     Get get = new Get(row);
     get.addColumn(family, qualifier);
     get.setTimestamp(stamp);
@@ -2754,8 +2810,7 @@ public class TestFromClientSide {
   }
 
   private void getVersionAndVerifyMissing(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long stamp)
-  throws Exception {
+      byte [] qualifier, long stamp) throws Exception {
     Get get = new Get(row);
     get.addColumn(family, qualifier);
     get.setTimestamp(stamp);
@@ -2765,8 +2820,7 @@ public class TestFromClientSide {
   }
 
   private void scanVersionAndVerify(Table ht, byte [] row, byte [] family,
-      byte [] qualifier, long stamp, byte [] value)
-  throws Exception {
+      byte [] qualifier, long stamp, byte [] value) throws Exception {
     Scan scan = new Scan(row);
     scan.addColumn(family, qualifier);
     scan.setTimestamp(stamp);
@@ -2776,8 +2830,7 @@ public class TestFromClientSide {
   }
 
   private void scanVersionAndVerifyMissing(Table ht, byte [] row,
-      byte [] family, byte [] qualifier, long stamp)
-  throws Exception {
+      byte [] family, byte [] qualifier, long stamp) throws Exception {
     Scan scan = new Scan(row);
     scan.addColumn(family, qualifier);
     scan.setTimestamp(stamp);
@@ -2786,10 +2839,7 @@ public class TestFromClientSide {
     assertNullResult(result);
   }
 
-  private void getTestNull(Table ht, byte [] row, byte [] family,
-      byte [] value)
-  throws Exception {
-
+  private void getTestNull(Table ht, byte [] row, byte [] family, byte [] value) throws Exception {
     Get get = new Get(row);
     get.addColumn(family, null);
     Result result = ht.get(get);
@@ -2866,9 +2916,7 @@ public class TestFromClientSide {
   }
 
   private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES,
-      byte [][] QUALIFIERS, byte [][] VALUES)
-  throws Exception {
-
+      byte [][] QUALIFIERS, byte [][] VALUES) throws Exception {
     // Single column from memstore
     Get get = new Get(ROWS[0]);
     get.addColumn(FAMILIES[4], QUALIFIERS[0]);
@@ -2923,7 +2971,7 @@ public class TestFromClientSide {
     assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
         new int [][] {
           {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
-    });
+        });
 
     // Multiple columns from everywhere storefile, many family, wildcard
     get = new Get(ROWS[0]);
@@ -2939,7 +2987,7 @@ public class TestFromClientSide {
     assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
         new int [][] {
           {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
-    });
+        });
 
     // Everything
     get = new Get(ROWS[0]);
@@ -2947,7 +2995,7 @@ public class TestFromClientSide {
     assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
         new int [][] {
           {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0}
-    });
+        });
 
     // Get around inserted columns
 
@@ -2964,9 +3012,7 @@ public class TestFromClientSide {
   }
 
   private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES,
-      byte [][] QUALIFIERS, byte [][] VALUES)
-  throws Exception {
-
+      byte [][] QUALIFIERS, byte [][] VALUES) throws Exception {
     // Single column from memstore
     Scan scan = new Scan();
     scan.addColumn(FAMILIES[4], QUALIFIERS[0]);
@@ -3021,7 +3067,7 @@ public class TestFromClientSide {
     assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
         new int [][] {
           {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
-    });
+        });
 
     // Multiple columns from everywhere storefile, many family, wildcard
     scan = new Scan();
@@ -3037,7 +3083,7 @@ public class TestFromClientSide {
     assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
         new int [][] {
           {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}
-    });
+        });
 
     // Everything
     scan = new Scan();
@@ -3045,7 +3091,7 @@ public class TestFromClientSide {
     assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES,
         new int [][] {
           {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0}
-    });
+        });
 
     // Scan around inserted columns
 
@@ -3065,13 +3111,9 @@ public class TestFromClientSide {
    * Expects family and qualifier arrays to be valid for at least
    * the range:  idx-2 < idx < idx+2
    */
-  private void getVerifySingleColumn(Table ht,
-      byte [][] ROWS, int ROWIDX,
-      byte [][] FAMILIES, int FAMILYIDX,
-      byte [][] QUALIFIERS, int QUALIFIERIDX,
-      byte [][] VALUES, int VALUEIDX)
-  throws Exception {
-
+  private void getVerifySingleColumn(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+      int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX, byte [][] VALUES, int VALUEIDX)
+      throws Exception {
     Get get = new Get(ROWS[ROWIDX]);
     Result result = ht.get(get);
     assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX],
@@ -3123,13 +3165,9 @@ public class TestFromClientSide {
    * the range:  idx-2 to idx+2
    * Expects row array to be valid for at least idx to idx+2
    */
-  private void scanVerifySingleColumn(Table ht,
-      byte [][] ROWS, int ROWIDX,
-      byte [][] FAMILIES, int FAMILYIDX,
-      byte [][] QUALIFIERS, int QUALIFIERIDX,
-      byte [][] VALUES, int VALUEIDX)
-  throws Exception {
-
+  private void scanVerifySingleColumn(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+      int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX, byte [][] VALUES, int VALUEIDX)
+      throws Exception {
     Scan scan = new Scan();
     Result result = getSingleScanResult(ht, scan);
     assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX],
@@ -3183,12 +3221,8 @@ public class TestFromClientSide {
    * Verify we do not read any values by accident around a single column
    * Same requirements as getVerifySingleColumn
    */
-  private void getVerifySingleEmpty(Table ht,
-      byte [][] ROWS, int ROWIDX,
-      byte [][] FAMILIES, int FAMILYIDX,
-      byte [][] QUALIFIERS, int QUALIFIERIDX)
-  throws Exception {
-
+  private void getVerifySingleEmpty(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+      int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) throws Exception {
     Get get = new Get(ROWS[ROWIDX]);
     get.addFamily(FAMILIES[4]);
     get.addColumn(FAMILIES[4], QUALIFIERS[1]);
@@ -3214,12 +3248,8 @@ public class TestFromClientSide {
 
   }
 
-  private void scanVerifySingleEmpty(Table ht,
-      byte [][] ROWS, int ROWIDX,
-      byte [][] FAMILIES, int FAMILYIDX,
-      byte [][] QUALIFIERS, int QUALIFIERIDX)
-  throws Exception {
-
+  private void scanVerifySingleEmpty(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES,
+      int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) throws Exception {
     Scan scan = new Scan(ROWS[ROWIDX+1]);
     Result result = getSingleScanResult(ht, scan);
     assertNullResult(result);
@@ -3244,9 +3274,7 @@ public class TestFromClientSide {
   // Verifiers
   //
 
-  private void assertKey(Cell key, byte [] row, byte [] family,
-      byte [] qualifier, byte [] value)
-  throws Exception {
+  private void assertKey(Cell key, byte [] row, byte [] family, byte [] qualifier, byte [] value) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]",
         equals(row, CellUtil.cloneRow(key)));
@@ -3262,8 +3290,7 @@ public class TestFromClientSide {
   }
 
   static void assertIncrementKey(Cell key, byte [] row, byte [] family,
-      byte [] qualifier, long value)
-  throws Exception {
+      byte [] qualifier, long value) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]",
         equals(row, CellUtil.cloneRow(key)));
@@ -3284,9 +3311,7 @@ public class TestFromClientSide {
   }
 
   private void assertNResult(Result result, byte [] row,
-      byte [][] families, byte [][] qualifiers, byte [][] values,
-      int [][] idxs)
-  throws Exception {
+      byte [][] families, byte [][] qualifiers, byte [][] values, int [][] idxs) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
@@ -3318,8 +3343,7 @@ public class TestFromClientSide {
 
   private void assertNResult(Result result, byte [] row,
       byte [] family, byte [] qualifier, long [] stamps, byte [][] values,
-      int start, int end)
-  throws IOException {
+      int start, int end) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
@@ -3353,8 +3377,7 @@ public class TestFromClientSide {
    */
   private void assertDoubleResult(Result result, byte [] row,
       byte [] familyA, byte [] qualifierA, byte [] valueA,
-      byte [] familyB, byte [] qualifierB, byte [] valueB)
-  throws Exception {
+      byte [] familyB, byte [] qualifierB, byte [] valueB) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
@@ -3384,8 +3407,7 @@ public class TestFromClientSide {
   }
 
   private void assertSingleResult(Result result, byte [] row, byte [] family,
-      byte [] qualifier, byte [] value)
-  throws Exception {
+      byte [] qualifier, byte [] value) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
@@ -3423,8 +3445,7 @@ public class TestFromClientSide {
   }
 
   private void assertSingleResult(Result result, byte [] row, byte [] family,
-      byte [] qualifier, long ts, byte [] value)
-  throws Exception {
+      byte [] qualifier, long ts, byte [] value) {
     assertTrue("Expected row [" + Bytes.toString(row) + "] " +
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
@@ -3515,7 +3536,7 @@ public class TestFromClientSide {
 
   @Test
   public void testDuplicateVersions() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     long [] STAMPS = makeStamps(20);
     byte [][] VALUES = makeNAscii(VALUE, 20);
@@ -3738,7 +3759,7 @@ public class TestFromClientSide {
 
   @Test
   public void testUpdates() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
 
       // Write a column with values at timestamp 1, 2 and 3
@@ -3788,7 +3809,7 @@ public class TestFromClientSide {
 
   @Test
   public void testUpdatesWithMajorCompaction() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10);
         Admin admin = TEST_UTIL.getAdmin()) {
 
@@ -3849,7 +3870,7 @@ public class TestFromClientSide {
 
   @Test
   public void testMajorCompactionBetweenTwoUpdates() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10);
         Admin admin = TEST_UTIL.getAdmin()) {
 
@@ -3916,7 +3937,7 @@ public class TestFromClientSide {
 
   @Test
   public void testGet_EmptyTable() throws IOException {
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       Get get = new Get(ROW);
       get.addFamily(FAMILY);
       Result r = table.get(get);
@@ -3926,7 +3947,7 @@ public class TestFromClientSide {
 
   @Test
   public void testGet_NullQualifier() throws IOException {
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, VALUE);
       table.put(put);
@@ -3950,7 +3971,7 @@ public class TestFromClientSide {
 
   @Test
   public void testGet_NonExistentRow() throws IOException {
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, VALUE);
       table.put(put);
@@ -3978,7 +3999,7 @@ public class TestFromClientSide {
     final byte [] row1 = Bytes.toBytes("row1");
     final byte [] row2 = Bytes.toBytes("row2");
     final byte [] value = Bytes.toBytes("abcd");
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+    try (Table table = TEST_UTIL.createTable(name.getTableName(),
         new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY })) {
       Put put = new Put(row1);
       put.addColumn(CONTENTS_FAMILY, null, value);
@@ -4017,7 +4038,7 @@ public class TestFromClientSide {
   public void testPutNoCF() throws IOException {
     final byte[] BAD_FAM = Bytes.toBytes("BAD_CF");
     final byte[] VAL = Bytes.toBytes(100);
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       boolean caughtNSCFE = false;
 
       try {
@@ -4037,7 +4058,7 @@ public class TestFromClientSide {
     final byte[] SMALL_FAMILY = Bytes.toBytes("smallfam");
     final int NB_BATCH_ROWS = 10;
     final byte[] value = Bytes.toBytes("abcd");
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+    try (Table table = TEST_UTIL.createTable(name.getTableName(),
       new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY })) {
       ArrayList<Put> rowsUpdate = new ArrayList<Put>();
       for (int i = 0; i < NB_BATCH_ROWS; i++) {
@@ -4067,7 +4088,7 @@ public class TestFromClientSide {
     final byte[] SMALL_FAMILY = Bytes.toBytes("smallfam");
     final byte[] value = Bytes.toBytes("abcd");
     final int NB_BATCH_ROWS = 10;
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+    try (Table table = TEST_UTIL.createTable(name.getTableName(),
         new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY })) {
       ArrayList<Put> rowsUpdate = new ArrayList<Put>();
       for (int i = 0; i < NB_BATCH_ROWS * 10; i++) {
@@ -4130,7 +4151,7 @@ public class TestFromClientSide {
     final byte [] FAM1 = Bytes.toBytes("fam1");
     final byte [] FAM2 = Bytes.toBytes("fam2");
     // Open table
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+    try (Table table = TEST_UTIL.createTable(name.getTableName(),
       new byte [][] {FAM1, FAM2})) {
       // Insert some values
       Put put = new Put(ROW);
@@ -4213,9 +4234,10 @@ public class TestFromClientSide {
 
   @Test
   public void testListTables() throws IOException, InterruptedException {
-    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
-    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
-    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
+    final String testTableName = name.getTableName().toString();
+    final TableName tableName1 = TableName.valueOf(testTableName + "1");
+    final TableName tableName2 = TableName.valueOf(testTableName + "2");
+    final TableName tableName3 = TableName.valueOf(testTableName + "3");
     TableName [] tables = new TableName[] { tableName1, tableName2, tableName3 };
     for (int i = 0; i < tables.length; i++) {
       TEST_UTIL.createTable(tables[i], FAMILY);
@@ -4244,7 +4266,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testUnmanagedHConnection() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
     try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
         Table t = conn.getTable(tableName);
@@ -4260,7 +4282,13 @@ public class TestFromClientSide {
    */
   @Test
   public void testUnmanagedHConnectionReconnect() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    Configuration conf = TEST_UTIL.getConfiguration();
+    Class registryImpl = conf.getClass(
+        HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
+    // This test does not make sense for MasterRegistry since it stops the only master in the
+    // cluster and starts a new master without populating the underlying config for the connection.
+    Assume.assumeFalse(registryImpl.equals(MasterRegistry.class));
+    final TableName tableName = name.getTableName();
     TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
     try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
       try (Table t = conn.getTable(tableName); Admin admin = conn.getAdmin()) {
@@ -4290,8 +4318,9 @@ public class TestFromClientSide {
 
   @Test
   public void testMiscHTableStuff() throws IOException {
-    final TableName tableAname = TableName.valueOf(name.getMethodName() + "A");
-    final TableName tableBname = TableName.valueOf(name.getMethodName() + "B");
+    final String testTableName = name.getTableName().toString();
+    final TableName tableAname = TableName.valueOf(testTableName + "A");
+    final TableName tableBname = TableName.valueOf(testTableName + "B");
     final byte[] attrName = Bytes.toBytes("TESTATTR");
     final byte[] attrValue = Bytes.toBytes("somevalue");
     byte[] value = Bytes.toBytes("value");
@@ -4340,8 +4369,9 @@ public class TestFromClientSide {
       // add a user attribute to HTD
       desc.setValue(attrName, attrValue);
       // add a user attribute to HCD
-      for (HColumnDescriptor c : desc.getFamilies())
+      for (HColumnDescriptor c : desc.getFamilies()) {
         c.setValue(attrName, attrValue);
+      }
       // update metadata for all regions of this table
       admin.modifyTable(desc);
       // enable the table
@@ -4365,7 +4395,7 @@ public class TestFromClientSide {
 
   @Test
   public void testGetClosestRowBefore() throws IOException, InterruptedException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     final byte[] firstRow = Bytes.toBytes("row111");
     final byte[] secondRow = Bytes.toBytes("row222");
     final byte[] thirdRow = Bytes.toBytes("row333");
@@ -4492,7 +4522,7 @@ public class TestFromClientSide {
   @Test
   public void testMultiRowMutation() throws Exception {
     LOG.info("Starting testMultiRowMutation");
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     final byte [] ROW1 = Bytes.toBytes("testRow1");
 
     try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -4524,7 +4554,7 @@ public class TestFromClientSide {
   @Test
   public void testRowMutation() throws Exception {
     LOG.info("Starting testRowMutation");
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") };
       RowMutations arm = new RowMutations(ROW);
@@ -4574,7 +4604,7 @@ public class TestFromClientSide {
   @Test
   public void testBatchAppendWithReturnResultFalse() throws Exception {
     LOG.info("Starting testBatchAppendWithReturnResultFalse");
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
       Append append1 = new Append(Bytes.toBytes("row1"));
       append1.setReturnResults(false);
@@ -4598,7 +4628,7 @@ public class TestFromClientSide {
   @Test
   public void testAppend() throws Exception {
     LOG.info("Starting testAppend");
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[] v1 = Bytes.toBytes("42");
       byte[] v2 = Bytes.toBytes("23");
@@ -4690,7 +4720,7 @@ public class TestFromClientSide {
 
   @Test
   public void testClientPoolRoundRobin() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     int poolSize = 3;
     int numVersions = poolSize * 2;
@@ -4728,7 +4758,7 @@ public class TestFromClientSide {
 
   @Ignore ("Flakey: HBASE-8989") @Test
   public void testClientPoolThreadLocal() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     int poolSize = Integer.MAX_VALUE;
     int numVersions = 3;
@@ -4814,7 +4844,7 @@ public class TestFromClientSide {
     final byte [] anotherrow = Bytes.toBytes("anotherrow");
     final byte [] value2 = Bytes.toBytes("abcd");
 
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       Put put1 = new Put(ROW);
       put1.addColumn(FAMILY, QUALIFIER, VALUE);
 
@@ -4852,7 +4882,7 @@ public class TestFromClientSide {
 
   @Test
   public void testCheckAndMutateWithTimeRange() throws IOException {
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       final long ts = System.currentTimeMillis() / 2;
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
@@ -4948,7 +4978,7 @@ public class TestFromClientSide {
     final byte [] value3 = Bytes.toBytes("cccc");
     final byte [] value4 = Bytes.toBytes("dddd");
 
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
 
       Put put2 = new Put(ROW);
       put2.addColumn(FAMILY, QUALIFIER, value2);
@@ -5030,7 +5060,7 @@ public class TestFromClientSide {
   public void testCheckAndDelete() throws IOException {
     final byte [] value1 = Bytes.toBytes("aaaa");
 
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+    try (Table table = TEST_UTIL.createTable(name.getTableName(),
         FAMILY)) {
 
       Put put = new Put(ROW);
@@ -5053,7 +5083,7 @@ public class TestFromClientSide {
     final byte [] value3 = Bytes.toBytes("cccc");
     final byte [] value4 = Bytes.toBytes("dddd");
 
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
+    try (Table table = TEST_UTIL.createTable(name.getTableName(),
         FAMILY)) {
 
       Put put2 = new Put(ROW);
@@ -5143,9 +5173,9 @@ public class TestFromClientSide {
   * Test ScanMetrics
   */
   @Test
-  @SuppressWarnings ("unused")
+  @SuppressWarnings({"unused", "checkstyle:EmptyBlock"})
   public void testScanMetrics() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     // Set up test table:
     // Create table:
@@ -5198,7 +5228,6 @@ public class TestFromClientSide {
         // the end of the scanner. So this is asking for 2 of the 3 rows we inserted.
         for (Result result : scanner.next(numRecords - 1)) {
         }
-
         ScanMetrics scanMetrics = scanner.getScanMetrics();
         assertEquals("Did not access all the regions in the table", numOfRegions,
                 scanMetrics.countOfRegions.get());
@@ -5279,7 +5308,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testCacheOnWriteEvictOnClose() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [] data = Bytes.toBytes("data");
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
       try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
@@ -5403,7 +5432,7 @@ public class TestFromClientSide {
    */
   public void testNonCachedGetRegionLocation() throws Exception {
     // Test Initialization.
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [] family1 = Bytes.toBytes("f1");
     byte [] family2 = Bytes.toBytes("f2");
     try (Table table = TEST_UTIL.createTable(tableName, new byte[][] {family1, family2}, 10);
@@ -5452,7 +5481,7 @@ public class TestFromClientSide {
     // Test Initialization.
     byte [] startKey = Bytes.toBytes("ddc");
     byte [] endKey = Bytes.toBytes("mmm");
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     TEST_UTIL.createMultiRegionTable(tableName, new byte[][] { FAMILY }, 10);
 
     int numOfRegions = -1;
@@ -5522,7 +5551,7 @@ public class TestFromClientSide {
 
   @Test
   public void testJira6912() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table foo = TEST_UTIL.createTable(tableName, new byte[][] {FAMILY}, 10)) {
 
       List<Put> puts = new ArrayList<Put>();
@@ -5551,7 +5580,7 @@ public class TestFromClientSide {
 
   @Test
   public void testScan_NullQualifier() throws IOException {
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, VALUE);
       table.put(put);
@@ -5581,7 +5610,7 @@ public class TestFromClientSide {
 
   @Test
   public void testNegativeTimestamp() throws IOException {
-    try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) {
+    try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) {
 
       try {
         Put put = new Put(ROW, -1);
@@ -5642,7 +5671,7 @@ public class TestFromClientSide {
 
   @Test
   public void testRawScanRespectsVersions() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[] row = Bytes.toBytes("row");
 
@@ -5716,7 +5745,7 @@ public class TestFromClientSide {
   @Test
   public void testEmptyFilterList() throws Exception {
     // Test Initialization.
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
 
       // Insert one row each region
@@ -5756,7 +5785,7 @@ public class TestFromClientSide {
   @Test
   public void testSmallScan() throws Exception {
     // Test Initialization.
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
 
       // Insert one row each region
@@ -5794,7 +5823,7 @@ public class TestFromClientSide {
 
   @Test
   public void testSuperSimpleWithReverseScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000"));
       put.addColumn(FAMILY, QUALIFIER, VALUE);
@@ -5839,7 +5868,7 @@ public class TestFromClientSide {
 
   @Test
   public void testFiltersWithReverseScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 10);
       byte[][] QUALIFIERS = {Bytes.toBytes("col0-<d2v1>-<d3v2>"),
@@ -5882,7 +5911,7 @@ public class TestFromClientSide {
 
   @Test
   public void testKeyOnlyFilterWithReverseScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 10);
       byte[][] QUALIFIERS = {Bytes.toBytes("col0-<d2v1>-<d3v2>"),
@@ -5923,7 +5952,7 @@ public class TestFromClientSide {
    */
   @Test
   public void testSimpleMissingWithReverseScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       byte[][] ROWS = makeN(ROW, 4);
 
@@ -5988,7 +6017,7 @@ public class TestFromClientSide {
 
   @Test
   public void testNullWithReverseScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
       // Null qualifier (should work)
       Put put = new Put(ROW);
@@ -6001,7 +6030,8 @@ public class TestFromClientSide {
     }
 
     // Use a new table
-    try (Table ht = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName() + "2"), FAMILY)) {
+    try (Table ht =
+       TEST_UTIL.createTable(TableName.valueOf(name.getTableName().toString() + "2"), FAMILY)) {
       // Empty qualifier, byte[0] instead of null (should work)
       Put put = new Put(ROW);
       put.addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE);
@@ -6027,7 +6057,7 @@ public class TestFromClientSide {
   @Test
   @SuppressWarnings("checkstyle:MethodLength")
   public void testDeletesWithReverseScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte[][] ROWS = makeNAscii(ROW, 6);
     byte[][] FAMILIES = makeNAscii(FAMILY, 3);
     byte[][] VALUES = makeN(VALUE, 5);
@@ -6213,7 +6243,7 @@ public class TestFromClientSide {
   @Test
   public void testReversedScanUnderMultiRegions() throws Exception {
     // Test Initialization.
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY;
     byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
         Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
@@ -6274,7 +6304,7 @@ public class TestFromClientSide {
   @Test
   public void testSmallReversedScanUnderMultiRegions() throws Exception {
     // Test Initialization.
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte[][] splitRows = new byte[][]{
         Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"),
         Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")};
@@ -6494,7 +6524,7 @@ public class TestFromClientSide {
   @Test
   public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception {
     try (Admin admin = TEST_UTIL.getAdmin()) {
-      final TableName tableName = TableName.valueOf(name.getMethodName());
+      final TableName tableName = name.getTableName();
 
       byte[][] VALUES = makeN(VALUE, 5);
       long[] ts = {1000, 2000, 3000, 4000, 5000};
@@ -6540,7 +6570,7 @@ public class TestFromClientSide {
   @Test
   public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception {
     try (Admin admin = TEST_UTIL.getAdmin()) {
-      final TableName tableName = TableName.valueOf(name.getMethodName());
+      final TableName tableName = name.getTableName();
 
       byte[][] VALUES = makeN(VALUE, 5);
       long[] ts = {1000, 2000, 3000, 4000, 5000};
@@ -6603,7 +6633,7 @@ public class TestFromClientSide {
   @Test
   public void testReadWithFilter() throws Exception {
     try (Admin admin = TEST_UTIL.getAdmin()) {
-      final TableName tableName = TableName.valueOf(name.getMethodName());
+      final TableName tableName = name.getTableName();
       try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 3)) {
 
         byte[] VALUEA = Bytes.toBytes("value-a");
@@ -6690,7 +6720,7 @@ public class TestFromClientSide {
 
   @Test
   public void testCellUtilTypeMethods() throws IOException {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
 
       final byte[] row = Bytes.toBytes("p");
@@ -6744,7 +6774,7 @@ public class TestFromClientSide {
 
   @Test(expected = DoNotRetryIOException.class)
   public void testCreateTableWithZeroRegionReplicas() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
         .setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("cf")))
         .setRegionReplication(0)
@@ -6755,7 +6785,7 @@ public class TestFromClientSide {
 
   @Test(expected = DoNotRetryIOException.class)
   public void testModifyTableWithZeroRegionReplicas() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
         .setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("cf")))
         .build();
@@ -6770,13 +6800,13 @@ public class TestFromClientSide {
 
   @Test(timeout = 60000)
   public void testModifyTableWithMemstoreData() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     createTableAndValidateTableSchemaModification(tableName, true);
   }
 
   @Test(timeout = 60000)
   public void testDeleteCFWithMemstoreData() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     createTableAndValidateTableSchemaModification(tableName, false);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
index 37d0135..8845f9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
@@ -17,14 +17,16 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.Arrays;
+import java.util.Collection;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized;
 
 /**
  * Test all client operations with a coprocessor that just implements the default flush/compact/scan
@@ -32,13 +34,24 @@ import org.junit.experimental.categories.Category;
  */
 @Category({ LargeTests.class, ClientTests.class })
 public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
-
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestFromClientSideWithCoprocessor.class);
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    initialize(MultiRowMutationEndpoint.class, NoOpScanPolicyObserver.class);
+  // Override the parameters from the parent class. We just want to run it for the default
+  // param combination.
+  @Parameterized.Parameters
+  public static Collection parameters() {
+    return Arrays.asList(new Object[][] {
+        { ZKConnectionRegistry.class, 1}
+    });
+  }
+
+  public TestFromClientSideWithCoprocessor(Class registry, int numHedgedReqs) throws Exception {
+    if (TEST_UTIL == null) {
+      // It is ok to initialize once because the test is parameterized for a single dimension.
+      initialize(registry, numHedgedReqs, NoOpScanPolicyObserver.class,
+          MultiRowMutationEndpoint.class);
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
new file mode 100644
index 0000000..335f968
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.junit.Assert.assertEquals;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestMasterRegistry {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMasterRegistry.class);
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
+    StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
+    builder.numMasters(3).numRegionServers(3);
+    TEST_UTIL.startMiniCluster(builder.build());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Generates a string of dummy master addresses in host:port format. Every other hostname won't
+   * have a port number.
+   */
+  private static String generateDummyMastersList(int size) {
+    List<String> masters = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      masters.add(" localhost" + (i % 2 == 0 ? ":" + (1000 + i) : ""));
+    }
+    return String.join(",", masters);
+  }
+
+  /**
+   * Makes sure the master registry parses the master end points in the configuration correctly.
+   */
+  @Test public void testMasterAddressParsing() {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    int numMasters = 10;
+    conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
+    try (MasterRegistry registry = new MasterRegistry(conf)) {
+      List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
+      // Half of them would be without a port, duplicates are removed.
+      assertEquals(numMasters/2 + 1, parsedMasters.size());
+      // Sort in the increasing order of port numbers.
+      Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
+      for (int i = 0; i < parsedMasters.size(); i++) {
+        ServerName sn = parsedMasters.get(i);
+        assertEquals("localhost", sn.getHostname());
+        if (i == parsedMasters.size() - 1) {
+          // Last entry should be the one with default port.
+          assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort());
+        } else {
+          assertEquals(1000 + (2 * i), sn.getPort());
+        }
+      }
+    }
+  }
+
+  @Test public void testRegistryRPCs() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    for (int numHedgedReqs = 1; numHedgedReqs <=3; numHedgedReqs++) {
+      if (numHedgedReqs == 1) {
+        conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+      } else {
+        conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+      }
+      conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+      try (MasterRegistry registry = new MasterRegistry(conf)) {
+        assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
+        assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
+        List<HRegionLocation> metaLocations =
+            Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
+        List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
+            .getMetaRegionLocations().get();
+        Collections.sort(metaLocations);
+        Collections.sort(actualMetaLocations);
+        assertEquals(actualMetaLocations, metaLocations);
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index c3da587..796ebb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -29,9 +29,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -44,35 +45,38 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.TestTableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
- * A client-side test, mostly testing scanners with various parameters.
+ * A client-side test, mostly testing scanners with various parameters. Parameterized on different
+ * registry implementations.
  */
 @Category({MediumTests.class, ClientTests.class})
+@RunWith(Parameterized.class)
 public class TestScannersFromClientSide {
 
   @ClassRule
@@ -81,38 +85,80 @@ public class TestScannersFromClientSide {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
 
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static HBaseTestingUtility TEST_UTIL;
   private static byte [] ROW = Bytes.toBytes("testRow");
   private static byte [] FAMILY = Bytes.toBytes("testFamily");
   private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
   private static byte [] VALUE = Bytes.toBytes("testValue");
 
   @Rule
-  public TestName name = new TestName();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
-    TEST_UTIL.startMiniCluster(3);
-  }
+  public TestTableName name = new TestTableName();
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
+    if (TEST_UTIL != null) {
+      TEST_UTIL.shutdownMiniCluster();
+    }
   }
 
-  @Before
-  public void setUp() throws Exception {
-    // Nothing to do.
+  @Parameterized.Parameters
+  public static Collection parameters() {
+    return Arrays.asList(new Object[][] {
+        { MasterRegistry.class, 1},
+        { MasterRegistry.class, 2},
+        { ZKConnectionRegistry.class, 1}
+    });
   }
 
   /**
-   * @throws java.lang.Exception
+   * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
+   * there is no easy way to restart the test cluster after each parameterized run. Annotation
+   * BeforeParam does not work either because it runs before parameterization and hence does not
+   * have access to the test parameters (which is weird).
+   *
+   * This *hack* checks if the current instance of test cluster configuration has the passed
+   * parameterized configs. In such a case, we can just reuse the cluster for test and do not need
+   * to initialize from scratch. While this is a hack, it saves a ton of time for the full
+   * test and de-flakes it.
    */
-  @After
-  public void tearDown() throws Exception {
-    // Nothing to do.
+  private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
+    // initialize() is called for every unit test, however we only want to reset the cluster state
+    // at the end of every parameterized run.
+    if (TEST_UTIL == null) {
+      return false;
+    }
+    Configuration conf = TEST_UTIL.getConfiguration();
+    Class confClass = conf.getClass(
+        HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
+    int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
+        HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
+    return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
+  }
+
+  public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
+    if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
+      return;
+    }
+    if (TEST_UTIL != null) {
+      // We reached the end of a parameterized run, clean up the cluster.
+      TEST_UTIL.shutdownMiniCluster();
+    }
+    TEST_UTIL = new HBaseTestingUtility();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
+    conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
+        ConnectionRegistry.class);
+    if (numHedgedReqs == 1) {
+      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
+    } else {
+      Preconditions.checkArgument(numHedgedReqs > 1);
+      conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
+    }
+    conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
+    StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
+    // Multiple masters needed only when hedged reads for master registry are enabled.
+    builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
+    TEST_UTIL.startMiniCluster(builder.build());
   }
 
   /**
@@ -120,7 +166,7 @@ public class TestScannersFromClientSide {
    */
   @Test
   public void testScanBatch() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
 
     Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@@ -190,7 +236,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testMaxResultSizeIsSetToDefault() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     Table ht = TEST_UTIL.createTable(tableName, FAMILY);
 
     // The max result size we expect the scan to use by default.
@@ -259,7 +305,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testSmallScan() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
 
     int numRows = 10;
     byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
@@ -292,7 +338,8 @@ public class TestScannersFromClientSide {
   /**
    * Run through a variety of test configurations with a small scan
    */
-  private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
+  private void testSmallScan(
+      Table table, boolean reversed, int rows, int columns) throws Exception {
     Scan baseScan = new Scan();
     baseScan.setReversed(reversed);
     baseScan.setSmall(true);
@@ -334,7 +381,7 @@ public class TestScannersFromClientSide {
    */
   @Test
   public void testGetMaxResults() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
 
@@ -408,8 +455,8 @@ public class TestScannersFromClientSide {
       kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
     }
     for (int i=0; i < 2; i++) {
-        kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
-      }
+      kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
+    }
     for (int i=10; i < 20; i++) {
       kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
     }
@@ -452,7 +499,7 @@ public class TestScannersFromClientSide {
    */
   @Test
   public void testScanMaxResults() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
     byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
@@ -500,7 +547,7 @@ public class TestScannersFromClientSide {
    */
   @Test
   public void testGetRowOffset() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
 
@@ -519,7 +566,9 @@ public class TestScannersFromClientSide {
       KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
       put.add(kv);
       // skipping first two kvs
-      if (i < 2) continue;
+      if (i < 2) {
+        continue;
+      }
       kvListExp.add(kv);
     }
     ht.put(put);
@@ -590,7 +639,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testScanRawDeleteFamilyVersion() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     TEST_UTIL.createTable(tableName, FAMILY);
     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(RPC_CODEC_CONF_KEY, "");
@@ -618,7 +667,7 @@ public class TestScannersFromClientSide {
    */
   @Test
   public void testScanOnReopenedRegion() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName = name.getTableName();
     byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
 
     Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@@ -693,8 +742,9 @@ public class TestScannersFromClientSide {
     LOG.info(msg);
     LOG.info("Expected count: " + expKvList.size());
     LOG.info("Actual count: " + result.size());
-    if (expKvList.isEmpty())
+    if (expKvList.isEmpty()) {
       return;
+    }
 
     int i = 0;
     for (Cell kv : result.rawCells()) {
@@ -715,7 +765,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testReadExpiredDataForRawScan() throws IOException {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     long ts = System.currentTimeMillis() - 10000;
     byte[] value = Bytes.toBytes("expired");
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
@@ -735,7 +785,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testScanWithColumnsAndFilterAndVersion() throws IOException {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
       for (int i = 0; i < 4; i++) {
         Put put = new Put(ROW);
@@ -757,7 +807,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testScanWithSameStartRowStopRow() throws IOException {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
       table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
 
@@ -794,7 +844,7 @@ public class TestScannersFromClientSide {
 
   @Test
   public void testReverseScanWithFlush() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableName tableName = name.getTableName();
     final int BATCH_SIZE = 10;
     final int ROWS_TO_INSERT = 100;
     final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index ac0d356..2797df3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -30,22 +30,31 @@ import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.internal.verification.VerificationModeFactory.times;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -54,14 +63,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.util.StringUtils;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
  * Some basic ipc tests.
@@ -232,7 +233,6 @@ public abstract class AbstractTestIPC {
   /**
    * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
    * remoteAddress set to its Call Object
-   * @throws ServiceException
    */
   @Test
   public void testRpcServerForNotNullRemoteAddressInCallObject()
@@ -363,6 +363,104 @@ public abstract class AbstractTestIPC {
     }
   }
 
+  /**
+   * Tests the various request fan out values using a simple RPC hedged across a mix of running and
+   * failing servers.
+   */
+  @Test
+  public void testHedgedAsyncEcho() throws Exception {
+    // Hedging is not supported for blocking connection types.
+    Assume.assumeFalse(this instanceof TestBlockingIPC);
+    List<RpcServer> rpcServers = new ArrayList<>();
+    List<InetSocketAddress> addresses = new ArrayList<>();
+    // Create a mix of running and failing servers.
+    final int numRunningServers = 5;
+    final int numFailingServers = 3;
+    final int numServers = numRunningServers + numFailingServers;
+    for (int i = 0; i < numRunningServers; i++) {
+      RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
+          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
+          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
+          new FifoRpcScheduler(CONF, 1));
+      rpcServer.start();
+      addresses.add(rpcServer.getListenerAddress());
+      rpcServers.add(rpcServer);
+    }
+    for (int i = 0; i < numFailingServers; i++) {
+      RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
+          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
+          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
+          new FifoRpcScheduler(CONF, 1));
+      rpcServer.start();
+      addresses.add(rpcServer.getListenerAddress());
+      rpcServers.add(rpcServer);
+    }
+    Configuration conf = HBaseConfiguration.create();
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+      // Try out various fan out values starting from 1 -> numServers.
+      for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
+        // Update the client's underlying conf, should be ok for the test.
+        LOG.debug("Testing with request fan out: " + reqFanOut);
+        conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
+        Interface stub = newStub(client, addresses);
+        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
+        stub.echo(new HBaseRpcControllerImpl(),
+            EchoRequestProto.newBuilder().setMessage("hello").build(), done);
+        TestProtos.EchoResponseProto responseProto = done.get();
+        assertNotNull(responseProto);
+        assertEquals("hello", responseProto.getMessage());
+        LOG.debug("Ended test with request fan out: " + reqFanOut);
+      }
+    } finally {
+      for (RpcServer rpcServer: rpcServers) {
+        rpcServer.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testHedgedAsyncTimeouts() throws Exception {
+    // Hedging is not supported for blocking connection types.
+    Assume.assumeFalse(this instanceof TestBlockingIPC);
+    List<RpcServer> rpcServers = new ArrayList<>();
+    List<InetSocketAddress> addresses = new ArrayList<>();
+    final int numServers = 3;
+    for (int i = 0; i < numServers; i++) {
+      RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
+          Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
+          SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
+          new FifoRpcScheduler(CONF, 1));
+      rpcServer.start();
+      addresses.add(rpcServer.getListenerAddress());
+      rpcServers.add(rpcServer);
+    }
+    Configuration conf = HBaseConfiguration.create();
+    int timeout = 100;
+    int pauseTime = 1000;
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+      // Try out various fan out values starting from 1 -> numServers.
+      for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
+        // Update the client's underlying conf, should be ok for the test.
+        LOG.debug("Testing with request fan out: " + reqFanOut);
+        conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
+        Interface stub = newStub(client, addresses);
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        pcrc.setCallTimeout(timeout);
+        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
+        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
+        assertNull(callback.get());
+        // Make sure the controller has the right exception propagated.
+        assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
+        LOG.debug("Ended test with request fan out: " + reqFanOut);
+      }
+    } finally {
+      for (RpcServer rpcServer: rpcServers) {
+        rpcServer.stop();
+      }
+    }
+  }
+
+
   @Test
   public void testAsyncRemoteError() throws IOException {
     AbstractRpcClient<?> client = createRpcClient(CONF);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
index d8a2d34..6adfa46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
@@ -17,21 +17,23 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-
+import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -41,8 +43,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Threads;
 
 @InterfaceAudience.Private
 public class TestProtobufRpcServiceImpl implements BlockingInterface {
@@ -67,6 +67,17 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
       User.getCurrent(), 0));
   }
 
+  public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
+      throws IOException {
+    Set<ServerName> serverNames = new HashSet<>();
+    for (InetSocketAddress addr: addrs) {
+      serverNames.add(ServerName.valueOf(
+          addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
+    }
+    return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
+        serverNames, User.getCurrent(), 0));
+  }
+
   @Override
   public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
       throws ServiceException {