You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/05/03 05:58:35 UTC

svn commit: r1478639 [2/10] - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java...

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1478639&r1=1478638&r2=1478639&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri May  3 03:58:33 2013
@@ -20,15 +20,9 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
 import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
 import java.net.SocketException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -59,12 +53,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterAdminProtocol;
-import org.apache.hadoop.hbase.MasterMonitorProtocol;
-import org.apache.hadoop.hbase.MasterProtocol;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -77,26 +66,95 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
-import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
-import org.apache.hadoop.hbase.ipc.RpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
 import org.apache.hadoop.hbase.util.Triple;
-import org.apache.hadoop.hbase.zookeeper.*;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -105,7 +163,8 @@ import com.google.protobuf.ServiceExcept
  * {@link Configuration}; all invocations of {@link #getConnection(Configuration)}
  * that pass the same {@link Configuration} instance will be returned the same
  * {@link  HConnection} instance (Adding properties to a Configuration
- * instance does not change its object identity).  Sharing {@link HConnection}
+ * instance does not change its object identity; for more on how this is done see
+ * {@link HConnectionKey}).  Sharing {@link HConnection}
  * instances is usually what you want; all clients of the {@link HConnection}
  * instances share the HConnections' cache of Region locations rather than each
  * having to discover for itself the location of meta, etc.  It makes
@@ -116,11 +175,9 @@ import com.google.protobuf.ServiceExcept
  * implemented atop Hadoop RPC and as of this writing, Hadoop RPC does a
  * connection per cluster-member, exclusively).
  *
- * <p>But sharing connections
- * makes clean up of {@link HConnection} instances a little awkward.  Currently,
- * clients cleanup by calling
- * {@link #deleteConnection(Configuration)}.  This will shutdown the
- * zookeeper connection the HConnection was using and clean up all
+ * <p>But sharing connections makes clean up of {@link HConnection} instances a little awkward.
+ * Currently, clients cleanup by calling {@link #deleteConnection(Configuration)}. This will
+ * shutdown the zookeeper connection the HConnection was using and clean up all
  * HConnection resources as well as stopping proxies to servers out on the
  * cluster. Not running the cleanup will not end the world; it'll
  * just stall the closeup some and spew some zookeeper connection failed
@@ -150,43 +207,30 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HConnectionManager {
+  static final Log LOG = LogFactory.getLog(HConnectionManager.class);
+
+  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
+
   // An LRU Map of HConnectionKey -> HConnection (TableServer).  All
   // access must be synchronized.  This map is not private because tests
   // need to be able to tinker with it.
-  static final Map<HConnectionKey, HConnectionImplementation> HBASE_INSTANCES;
-
-  public static final int MAX_CACHED_HBASE_INSTANCES;
-
-  /** Parameter name for what client protocol to use. */
-  public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class";
-
-  /** Default client protocol class name. */
-  public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
-
-  /** Parameter name for what admin protocol to use. */
-  public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class";
-
-  /** Default admin protocol class name. */
-  public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
+  static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
 
-  public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server";
-
-  private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
+  public static final int MAX_CACHED_CONNECTION_INSTANCES;
 
   static {
     // We set instances to one more than the value specified for {@link
     // HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
     // connections to the ensemble from the one client is 30, so in that case we
     // should run into zk issues before the LRU hit this value of 31.
-    MAX_CACHED_HBASE_INSTANCES = HBaseConfiguration.create().getInt(
-        HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
-        HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
-    HBASE_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
-        (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
-       @Override
+    MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
+      HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
+    CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
+        (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
+      @Override
       protected boolean removeEldestEntry(
           Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
-         return size() > MAX_CACHED_HBASE_INSTANCES;
+         return size() > MAX_CACHED_CONNECTION_INSTANCES;
        }
     };
   }
@@ -194,31 +238,31 @@ public class HConnectionManager {
   /*
    * Non-instantiable.
    */
-  protected HConnectionManager() {
+  private HConnectionManager() {
     super();
   }
 
   /**
-   * Get the connection that goes with the passed <code>conf</code>
-   * configuration instance.
-   * If no current connection exists, method creates a new connection for the
-   * passed <code>conf</code> instance.
+   * Get the connection that goes with the passed <code>conf</code> configuration instance.
+   * If no current connection exists, method creates a new connection and keys it using
+   * connection-specific properties from the passed {@link Configuration}; see
+   * {@link HConnectionKey}.
    * @param conf configuration
    * @return HConnection object for <code>conf</code>
    * @throws ZooKeeperConnectionException
    */
-  public static HConnection getConnection(Configuration conf)
+  public static HConnection getConnection(final Configuration conf)
   throws IOException {
     HConnectionKey connectionKey = new HConnectionKey(conf);
-    synchronized (HBASE_INSTANCES) {
-      HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
+    synchronized (CONNECTION_INSTANCES) {
+      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
       if (connection == null) {
         connection = new HConnectionImplementation(conf, true);
-        HBASE_INSTANCES.put(connectionKey, connection);
+        CONNECTION_INSTANCES.put(connectionKey, connection);
       } else if (connection.isClosed()) {
         HConnectionManager.deleteConnection(connectionKey, true);
         connection = new HConnectionImplementation(conf, true);
-        HBASE_INSTANCES.put(connectionKey, connection);
+        CONNECTION_INSTANCES.put(connectionKey, connection);
       }
       connection.incCount();
       return connection;
@@ -226,11 +270,10 @@ public class HConnectionManager {
   }
 
   /**
-   * Create a new HConnection instance using the passed <code>conf</code>
-   * instance.
-   * Note: This bypasses the usual HConnection life cycle management!
-   * Use this with caution, the caller is responsible for closing the
-   * created connection.
+   * Create a new HConnection instance using the passed <code>conf</code> instance.
+   * <p>Note: This bypasses the usual HConnection life cycle management done by
+   * {@link #getConnection(Configuration)}. Use this with caution, the caller is responsible for
+   * calling {@link HConnection#close()} on the returned connection instance.
    * @param conf configuration
    * @return HConnection object for <code>conf</code>
    * @throws ZooKeeperConnectionException
@@ -241,22 +284,19 @@ public class HConnectionManager {
   }
 
   /**
-   * Delete connection information for the instance specified by configuration.
-   * If there are no more references to it, this will then close connection to
-   * the zookeeper ensemble and let go of all resources.
+   * Delete connection information for the instance specified by passed configuration.
+   * If there are no more references to the designated connection connection, this method will
+   * then close connection to the zookeeper ensemble and let go of all associated resources.
    *
-   * @param conf
-   *          configuration whose identity is used to find {@link HConnection}
-   *          instance.
+   * @param conf configuration whose identity is used to find {@link HConnection} instance.
    */
   public static void deleteConnection(Configuration conf) {
     deleteConnection(new HConnectionKey(conf), false);
   }
 
   /**
-   * Delete stale connection information for the instance specified by configuration.
-   * This will then close connection to
-   * the zookeeper ensemble and let go of all resources.
+   * Cleanup a known stale connection.
+   * This will then close connection to the zookeeper ensemble and let go of all resources.
    *
    * @param connection
    */
@@ -268,22 +308,21 @@ public class HConnectionManager {
    * Delete information for all connections.
    */
   public static void deleteAllConnections() {
-    synchronized (HBASE_INSTANCES) {
+    synchronized (CONNECTION_INSTANCES) {
       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
-      connectionKeys.addAll(HBASE_INSTANCES.keySet());
+      connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
       for (HConnectionKey connectionKey : connectionKeys) {
         deleteConnection(connectionKey, false);
       }
-      HBASE_INSTANCES.clear();
+      CONNECTION_INSTANCES.clear();
     }
   }
 
   private static void deleteConnection(HConnection connection, boolean staleConnection) {
-    synchronized (HBASE_INSTANCES) {
-      for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
-          .entrySet()) {
-        if (connectionEntry.getValue() == connection) {
-          deleteConnection(connectionEntry.getKey(), staleConnection);
+    synchronized (CONNECTION_INSTANCES) {
+      for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
+        if (e.getValue() == connection) {
+          deleteConnection(e.getKey(), staleConnection);
           break;
         }
       }
@@ -291,18 +330,17 @@ public class HConnectionManager {
   }
 
   private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
-    synchronized (HBASE_INSTANCES) {
-      HConnectionImplementation connection = HBASE_INSTANCES
-          .get(connectionKey);
+    synchronized (CONNECTION_INSTANCES) {
+      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
       if (connection != null) {
         connection.decCount();
         if (connection.isZeroReference() || staleConnection) {
-          HBASE_INSTANCES.remove(connectionKey);
+          CONNECTION_INSTANCES.remove(connectionKey);
           connection.internalClose();
         }
       } else {
         LOG.error("Connection not found in the list, can't delete it "+
-          "(connection key="+connectionKey+"). May be the key was modified?");
+          "(connection key=" + connectionKey + "). May be the key was modified?");
       }
     }
   }
@@ -313,14 +351,12 @@ public class HConnectionManager {
    * @return Number of cached regions for the table.
    * @throws ZooKeeperConnectionException
    */
-  static int getCachedRegionCount(Configuration conf,
-      final byte[] tableName)
+  static int getCachedRegionCount(Configuration conf, final byte[] tableName)
   throws IOException {
     return execute(new HConnectable<Integer>(conf) {
       @Override
       public Integer connect(HConnection connection) {
-        return ((HConnectionImplementation) connection)
-            .getNumberOfCachedRegionLocations(tableName);
+        return ((HConnectionImplementation)connection).getNumberOfCachedRegionLocations(tableName);
       }
     });
   }
@@ -331,8 +367,8 @@ public class HConnectionManager {
    * @return true if the region where the table and row reside is cached.
    * @throws ZooKeeperConnectionException
    */
-  static boolean isRegionCached(Configuration conf,
-      final byte[] tableName, final byte[] row) throws IOException {
+  static boolean isRegionCached(Configuration conf, final byte[] tableName, final byte[] row)
+  throws IOException {
     return execute(new HConnectable<Boolean>(conf) {
       @Override
       public Boolean connect(HConnection connection) {
@@ -342,33 +378,9 @@ public class HConnectionManager {
   }
 
   /**
-   * This class makes it convenient for one to execute a command in the context
-   * of a {@link HConnection} instance based on the given {@link Configuration}.
-   *
-   * <p>
-   * If you find yourself wanting to use a {@link HConnection} for a relatively
-   * short duration of time, and do not want to deal with the hassle of creating
-   * and cleaning up that resource, then you should consider using this
-   * convenience class.
-   *
-   * @param <T>
-   *          the return type of the {@link HConnectable#connect(HConnection)}
-   *          method.
-   */
-  public static abstract class HConnectable<T> {
-    public Configuration conf;
-
-    protected HConnectable(Configuration conf) {
-      this.conf = conf;
-    }
-
-    public abstract T connect(HConnection connection) throws IOException;
-  }
-
-  /**
    * This convenience method invokes the given {@link HConnectable#connect}
    * implementation using a {@link HConnection} instance that lasts just for the
-   * duration of that invocation.
+   * duration of the invocation.
    *
    * @param <T> the return type of the connect method
    * @param connectable the {@link HConnectable} instance
@@ -398,127 +410,14 @@ public class HConnectionManager {
     }
   }
 
-  /**
-   * Denotes a unique key to a {@link HConnection} instance.
-   *
-   * In essence, this class captures the properties in {@link Configuration}
-   * that may be used in the process of establishing a connection. In light of
-   * that, if any new such properties are introduced into the mix, they must be
-   * added to the {@link HConnectionKey#properties} list.
-   *
-   */
-  public static class HConnectionKey {
-    final static String[] CONNECTION_PROPERTIES = new String[] {
-        HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
-        HConstants.ZOOKEEPER_CLIENT_PORT,
-        HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
-        HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
-        HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
-        HConstants.HBASE_META_SCANNER_CACHING,
-        HConstants.HBASE_CLIENT_INSTANCE_ID };
-
-    private Map<String, String> properties;
-    private String username;
-
-    public HConnectionKey(Configuration conf) {
-      Map<String, String> m = new HashMap<String, String>();
-      if (conf != null) {
-        for (String property : CONNECTION_PROPERTIES) {
-          String value = conf.get(property);
-          if (value != null) {
-            m.put(property, value);
-          }
-        }
-      }
-      this.properties = Collections.unmodifiableMap(m);
-
-      try {
-        User currentUser = User.getCurrent();
-        if (currentUser != null) {
-          username = currentUser.getName();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Error obtaining current user, skipping username in HConnectionKey",
-            ioe);
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      if (username != null) {
-        result = username.hashCode();
-      }
-      for (String property : CONNECTION_PROPERTIES) {
-        String value = properties.get(property);
-        if (value != null) {
-          result = prime * result + value.hashCode();
-        }
-      }
-
-      return result;
-    }
-
-
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ",
-        justification="Optimization")
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      HConnectionKey that = (HConnectionKey) obj;
-      if (this.username != null && !this.username.equals(that.username)) {
-        return false;
-      } else if (this.username == null && that.username != null) {
-        return false;
-      }
-      if (this.properties == null) {
-        if (that.properties != null) {
-          return false;
-        }
-      } else {
-        if (that.properties == null) {
-          return false;
-        }
-        for (String property : CONNECTION_PROPERTIES) {
-          String thisValue = this.properties.get(property);
-          String thatValue = that.properties.get(property);
-          //noinspection StringEquality
-          if (thisValue == thatValue) {
-            continue;
-          }
-          if (thisValue == null || !thisValue.equals(thatValue)) {
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return "HConnectionKey{" +
-        "properties=" + properties +
-        ", username='" + username + '\'' +
-        '}';
-    }
-  }
-
   /** Encapsulates connection to zookeeper and regionservers.*/
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
+      justification="Access to the conncurrent hash map is under a lock so should be fine.")
   static class HConnectionImplementation implements HConnection, Closeable {
     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
-    private final Class<? extends AdminProtocol> adminClass;
-    private final Class<? extends ClientProtocol> clientClass;
     private final long pause;
     private final int numTries;
-    private final int maxRPCAttempts;
     private final int rpcTimeout;
     private final int prefetchRegionLimit;
     private final boolean useServerTrackerForRetries;
@@ -546,22 +445,15 @@ public class HConnectionManager {
 
     private final Configuration conf;
 
-    // client RPC
-    private RpcClientEngine rpcEngine;
-
-    // Known region ServerName.toString() -> RegionClient/Admin
-    private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
-      new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
-    private final ConcurrentHashMap<String, String> connectionLock =
-      new ConcurrentHashMap<String, String>();
+    // Client rpc instance.
+    private RpcClient rpcClient;
 
     /**
      * Map of table to table {@link HRegionLocation}s.  The table key is made
      * by doing a {@link Bytes#mapKey(byte[])} of the table's name.
      */
-    private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
-      cachedRegionLocations =
-        new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
+    private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>> cachedRegionLocations =
+      new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
 
     // The presence of a server in the map implies it's likely that there is an
     // entry in cachedRegionLocations that map to this server; but the absence
@@ -579,47 +471,33 @@ public class HConnectionManager {
 
     // indicates whether this connection's life cycle is managed (by us)
     private final boolean managed;
+
     /**
      * constructor
      * @param conf Configuration object
+     * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
+     * to zk and shutdown of all services; we just close down the resources this connection was
+     * responsible for and decrement usage counters.  It is up to the caller to do the full
+     * cleanup.  It is set when we want have connection sharing going on -- reuse of zk connection,
+     * and cached region locations, established regionserver connections, etc.  When connections
+     * are shared, we have reference counting going on and will only do full cleanup when no more
+     * users of an HConnectionImplementation instance.
      */
-    @SuppressWarnings("unchecked")
-    public HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
+    HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
       this.conf = conf;
       this.managed = managed;
-      String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
-        DEFAULT_ADMIN_PROTOCOL_CLASS);
       this.closed = false;
-      try {
-        this.adminClass =
-          (Class<? extends AdminProtocol>) Class.forName(adminClassName);
-      } catch (ClassNotFoundException e) {
-        throw new UnsupportedOperationException(
-            "Unable to find region server interface " + adminClassName, e);
-      }
-      String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
-        DEFAULT_CLIENT_PROTOCOL_CLASS);
-      try {
-        this.clientClass =
-          (Class<? extends ClientProtocol>) Class.forName(clientClassName);
-      } catch (ClassNotFoundException e) {
-        throw new UnsupportedOperationException(
-            "Unable to find client protocol " + clientClassName, e);
-      }
       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
       this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-      this.maxRPCAttempts = conf.getInt(
-          HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
-          HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS);
+        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
       this.rpcTimeout = conf.getInt(
-          HConstants.HBASE_RPC_TIMEOUT_KEY,
-          HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
       this.prefetchRegionLimit = conf.getInt(
-          HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
-          HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
-      this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true);
+        HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
+        HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+      this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER_KEY, true);
       long serverTrackerTimeout = 0;
       if (this.useServerTrackerForRetries) {
         // Server tracker allows us to do faster, and yet useful (hopefully), retries.
@@ -636,11 +514,7 @@ public class HConnectionManager {
       this.serverTrackerTimeout = serverTrackerTimeout;
       retrieveClusterId();
 
-      // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
-      // but we maintain access through an interface to allow overriding for tests
-      // RPC engine setup must follow obtaining the cluster ID for token authentication to work
-      this.rpcEngine = new ProtobufRpcClientEngine(this.conf, this.clusterId);
-
+      this.rpcClient = new RpcClient(this.conf, this.clusterId);
 
       // Do we publish the status?
       Class<? extends ClusterStatusListener.Listener> listenerClass =
@@ -654,14 +528,25 @@ public class HConnectionManager {
               @Override
               public void newDead(ServerName sn) {
                 clearCaches(sn);
-                rpcEngine.getClient().cancelConnections(sn.getHostname(), sn.getPort(),
-                    new SocketException(sn.getServerName() + " is dead: closing its connection."));
+                rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
+                  new SocketException(sn.getServerName() + " is dead: closing its connection."));
               }
             }, conf, listenerClass);
       }
     }
 
     /**
+     * For tests only.
+     * @param rpcClient Client we should use instead.
+     * @return Previous rpcClient
+     */
+    RpcClient setRpcClient(final RpcClient rpcClient) {
+      RpcClient oldRpcClient = this.rpcClient;
+      this.rpcClient = rpcClient;
+      return oldRpcClient;
+    }
+
+    /**
      * An identifier that will remain the same for a given connection.
      * @return
      */
@@ -706,125 +591,6 @@ public class HConnectionManager {
       return this.conf;
     }
 
-    private static class MasterProtocolState {
-      public MasterProtocol protocol;
-      public int userCount;
-      public long keepAliveUntil = Long.MAX_VALUE;
-      public final Class<? extends MasterProtocol> protocolClass;
-
-      public MasterProtocolState (
-          final Class<? extends MasterProtocol> protocolClass) {
-        this.protocolClass = protocolClass;
-      }
-    }
-
-    /**
-     * Create a new Master proxy. Try once only.
-     */
-    private MasterProtocol createMasterInterface(
-        MasterProtocolState masterProtocolState)
-        throws IOException, KeeperException, ServiceException {
-
-      ZooKeeperKeepAliveConnection zkw;
-      try {
-        zkw = getKeepAliveZooKeeperWatcher();
-      } catch (IOException e) {
-        throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
-      }
-
-      try {
-
-        checkIfBaseNodeAvailable(zkw);
-        ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
-        if (sn == null) {
-          String msg =
-            "ZooKeeper available but no active master location found";
-          LOG.info(msg);
-          throw new MasterNotRunningException(msg);
-        }
-
-
-        InetSocketAddress isa =
-          new InetSocketAddress(sn.getHostname(), sn.getPort());
-        MasterProtocol tryMaster = rpcEngine.getProxy(
-            masterProtocolState.protocolClass,
-            isa, this.conf, this.rpcTimeout);
-
-        if (tryMaster.isMasterRunning(
-            null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
-          return tryMaster;
-        } else {
-          String msg = "Can create a proxy to master, but it is not running";
-          LOG.info(msg);
-          throw new MasterNotRunningException(msg);
-        }
-      } finally {
-        zkw.close();
-      }
-    }
-
-    /**
-     * Create a master, retries if necessary.
-     */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
-    private MasterProtocol createMasterWithRetries(
-      MasterProtocolState masterProtocolState) throws MasterNotRunningException {
-
-      // The lock must be at the beginning to prevent multiple master creation
-      //  (and leaks) in a multithread context
-
-      synchronized (this.masterAndZKLock) {
-        Exception exceptionCaught = null;
-        MasterProtocol master = null;
-        int tries = 0;
-        while (
-          !this.closed && master == null
-          ) {
-          tries++;
-          try {
-            master = createMasterInterface(masterProtocolState);
-          } catch (IOException e) {
-            exceptionCaught = e;
-          } catch (KeeperException e) {
-            exceptionCaught = e;
-          } catch (ServiceException e) {
-            exceptionCaught = e;
-          }
-
-          if (exceptionCaught != null)
-            // It failed. If it's not the last try, we're going to wait a little
-          if (tries < numTries) {
-            // tries at this point is 1 or more; decrement to start from 0.
-            long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
-            LOG.info("getMaster attempt " + tries + " of " + numTries +
-              " failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught);
-
-            try {
-              Thread.sleep(pauseTime);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new RuntimeException(
-                "Thread was interrupted while trying to connect to master.", e);
-            }
-
-          } else {
-            // Enough tries, we stop now
-            LOG.info("getMaster attempt " + tries + " of " + numTries +
-              " failed; no more retrying.", exceptionCaught);
-            throw new MasterNotRunningException(exceptionCaught);
-          }
-        }
-
-        if (master == null) {
-          // implies this.closed true
-          throw new MasterNotRunningException(
-            "Connection was closed while trying to get master");
-        }
-
-        return master;
-      }
-    }
-
     private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
       throws MasterNotRunningException {
       String errorMsg;
@@ -851,11 +617,16 @@ public class HConnectionManager {
      */
     @Override
     public boolean isMasterRunning()
-      throws MasterNotRunningException, ZooKeeperConnectionException {
-      // When getting the master proxy connection, we check it's running,
+    throws MasterNotRunningException, ZooKeeperConnectionException {
+      // When getting the master connection, we check it's running,
       // so if there is no exception, it means we've been able to get a
       // connection on a running master
-      getKeepAliveMasterMonitor().close();
+      MasterMonitorKeepAliveConnection m = getKeepAliveMasterMonitorService();
+      try {
+        m.close();
+      } catch (IOException e) {
+        throw new MasterNotRunningException("Failed close", e);
+      }
       return true;
     }
 
@@ -1142,7 +913,7 @@ public class HConnectionManager {
           metaLocation = locateRegion(parentTable, metaKey, true, false);
           // If null still, go around again.
           if (metaLocation == null) continue;
-          ClientProtocol server = getClient(metaLocation.getServerName());
+          ClientService.BlockingInterface service = getClient(metaLocation.getServerName());
 
           Result regionInfoRow;
           // This block guards against two threads trying to load the meta
@@ -1172,7 +943,7 @@ public class HConnectionManager {
               forceDeleteCachedLocation(tableName, row);
             }
             // Query the meta region for the location of the meta region
-            regionInfoRow = ProtobufUtil.getRowOrBefore(server,
+            regionInfoRow = ProtobufUtil.getRowOrBefore(service,
               metaLocation.getRegionInfo().getRegionName(), metaKey,
               HConstants.CATALOG_FAMILY);
           }
@@ -1231,7 +1002,7 @@ public class HConnectionManager {
           throw e;
         } catch (IOException e) {
           if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+            e = ((RemoteException)e).unwrapRemoteException();
           }
           if (tries < numTries - 1) {
             if (LOG.isDebugEnabled()) {
@@ -1445,103 +1216,334 @@ public class HConnectionManager {
       }
     }
 
-    @Override
-    @Deprecated
-    public AdminProtocol getAdmin(final String hostname, final int port) throws IOException {
-      return getAdmin(new ServerName(hostname, port, 0L));
+    // Map keyed by service name + regionserver to service stub implementation
+    private final ConcurrentHashMap<String, Object> stubs =
+      new ConcurrentHashMap<String, Object>();
+    // Map of locks used creating service stubs per regionserver.
+    private final ConcurrentHashMap<String, String> connectionLock =
+      new ConcurrentHashMap<String, String>();
+
+    /**
+     * Maintains current state of MasterService instance.
+     */
+    static abstract class MasterServiceState {
+      HConnection connection;
+      int userCount;
+      long keepAliveUntil = Long.MAX_VALUE;
+
+      MasterServiceState (final HConnection connection) {
+        super();
+        this.connection = connection;
+      }
+
+      abstract Object getStub();
+      abstract void clearStub();
+      abstract boolean isMasterRunning() throws ServiceException;
     }
 
-    @Override
-    public AdminProtocol getAdmin(final ServerName serverName)
-        throws IOException {
-      return getAdmin(serverName, false);
+    /**
+     * State of the MasterAdminService connection/setup.
+     */
+    static class MasterAdminServiceState extends MasterServiceState {
+      MasterAdminService.BlockingInterface stub;
+      MasterAdminServiceState(final HConnection connection) {
+        super(connection);
+      }
+
+      @Override
+      public String toString() {
+        return "MasterAdminService";
+      }
+
+      @Override
+      Object getStub() {
+        return this.stub;
+      }
+
+      @Override
+      void clearStub() {
+        this.stub = null;
+      }
+
+      @Override
+      boolean isMasterRunning() throws ServiceException {
+        MasterProtos.IsMasterRunningResponse response =
+          this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+        return response != null? response.getIsMasterRunning(): false;
+      }
     }
 
-    @Override
-    @Deprecated
-    public ClientProtocol getClient(final String hostname, final int port)
-    throws IOException {
-      return (ClientProtocol)getProtocol(hostname, port, clientClass);
+    /**
+     * State of the MasterMonitorService connection/setup.
+     */
+    static class MasterMonitorServiceState extends MasterServiceState {
+      MasterMonitorService.BlockingInterface stub;
+      MasterMonitorServiceState(final HConnection connection) {
+        super(connection);
+      }
+
+      @Override
+      public String toString() {
+        return "MasterMonitorService";
+      }
+
+      @Override
+      Object getStub() {
+        return this.stub;
+      }
+
+      @Override
+      void clearStub() {
+        this.stub = null;
+      }
+
+      @Override
+      boolean isMasterRunning() throws ServiceException {
+        MasterProtos.IsMasterRunningResponse response =
+          this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+        return response != null? response.getIsMasterRunning(): false;
+      }
     }
 
-    @Override
-    public ClientProtocol getClient(final ServerName serverName)
-        throws IOException {
-      if (isDeadServer(serverName)){
-        throw new RegionServerStoppedException("The server " + serverName + " is dead.");
+    /**
+     * Makes a client-side stub for master services. Sub-class to specialize.
+     * Depends on hosting class so not static.  Exists so we avoid duplicating a bunch of code
+     * when setting up the MasterMonitorService and MasterAdminService.
+     */
+    abstract class StubMaker {
+      /**
+       * Returns the name of the service stub being created.
+       */
+      protected abstract String getServiceName();
+
+      /**
+       * Make stub and cache it internal so can be used later doing the isMasterRunning call.
+       * @param channel
+       */
+      protected abstract Object makeStub(final BlockingRpcChannel channel);
+
+      /**
+       * Once setup, check it works by doing isMasterRunning check.
+       * @throws ServiceException
+       */
+      protected abstract void isMasterRunning() throws ServiceException;
+
+      /**
+       * Create a stub. Try once only.  It is not typed because there is no common type to
+       * protobuf services nor their interfaces.  Let the caller do appropriate casting.
+       * @return A stub for master services.
+       * @throws IOException
+       * @throws KeeperException
+       * @throws ServiceException
+       */
+      private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
+        ZooKeeperKeepAliveConnection zkw;
+        try {
+          zkw = getKeepAliveZooKeeperWatcher();
+        } catch (IOException e) {
+          throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
+        }
+        try {
+          checkIfBaseNodeAvailable(zkw);
+          ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
+          if (sn == null) {
+            String msg = "ZooKeeper available but no active master location found";
+            LOG.info(msg);
+            throw new MasterNotRunningException(msg);
+          }
+          if (isDeadServer(sn)) {
+            throw new MasterNotRunningException(sn + " is dead.");
+          }
+          // Use the security info interface name as our stub key
+          String key = getStubKey(getServiceName(), sn.getHostAndPort());
+          connectionLock.putIfAbsent(key, key);
+          Object stub = null;
+          synchronized (connectionLock.get(key)) {
+            stub = stubs.get(key);
+            if (stub == null) {
+              BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
+                  User.getCurrent(), rpcTimeout);
+              stub = makeStub(channel);
+              isMasterRunning();
+              stubs.put(key, stub);
+            }
+          }
+          return stub;
+        } finally {
+          zkw.close();
+        }
+      }
+
+      /**
+       * Create a stub against the master.  Retry if necessary.
+       * @return A stub to do <code>intf</code> against the master
+       * @throws MasterNotRunningException
+       */
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD")
+      Object makeStub() throws MasterNotRunningException {
+        // The lock must be at the beginning to prevent multiple master creations
+        //  (and leaks) in a multithread context
+        synchronized (masterAndZKLock) {
+          Exception exceptionCaught = null;
+          Object stub = null;
+          int tries = 0;
+          while (!closed && stub == null) {
+            tries++;
+            try {
+              stub = makeStubNoRetries();
+            } catch (IOException e) {
+              exceptionCaught = e;
+            } catch (KeeperException e) {
+              exceptionCaught = e;
+            } catch (ServiceException e) {
+              exceptionCaught = e;
+            }
+
+            if (exceptionCaught != null)
+              // It failed. If it's not the last try, we're going to wait a little
+              if (tries < numTries) {
+                // tries at this point is 1 or more; decrement to start from 0.
+                long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
+                LOG.info("getMaster attempt " + tries + " of " + numTries +
+                    " failed; retrying after sleep of " +pauseTime + ", exception=" +
+                  exceptionCaught);
+
+                try {
+                  Thread.sleep(pauseTime);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new RuntimeException(
+                      "Thread was interrupted while trying to connect to master.", e);
+                }
+              } else {
+                // Enough tries, we stop now
+                LOG.info("getMaster attempt " + tries + " of " + numTries +
+                    " failed; no more retrying.", exceptionCaught);
+                throw new MasterNotRunningException(exceptionCaught);
+              }
+          }
+
+          if (stub == null) {
+            // implies this.closed true
+            throw new MasterNotRunningException("Connection was closed while trying to get master");
+          }
+          return stub;
+        }
       }
-      return (ClientProtocol)
-          getProtocol(serverName.getHostname(), serverName.getPort(), clientClass);
     }
 
-    @Override
-    @Deprecated
-    public AdminProtocol getAdmin(final String hostname, final int port,
-        final boolean master)
-    throws IOException {
-      return (AdminProtocol)getProtocol(hostname, port, adminClass);
+    /**
+     * Class to make a MasterMonitorService stub.
+     */
+    class MasterMonitorServiceStubMaker extends StubMaker {
+      private MasterMonitorService.BlockingInterface stub;
+      @Override
+      protected String getServiceName() {
+        return MasterMonitorService.getDescriptor().getName();
+      }
+
+      @Override
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+      MasterMonitorService.BlockingInterface makeStub() throws MasterNotRunningException {
+        return (MasterMonitorService.BlockingInterface)super.makeStub();
+      }
+
+      @Override
+      protected Object makeStub(BlockingRpcChannel channel) {
+        this.stub = MasterMonitorService.newBlockingStub(channel);
+        return this.stub;
+      }
+
+      @Override
+      protected void isMasterRunning() throws ServiceException {
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      }
     }
 
+    /**
+     * Class to make a MasterAdminService stub.
+     */
+    class MasterAdminServiceStubMaker extends StubMaker {
+      private MasterAdminService.BlockingInterface stub;
+
+      @Override
+      protected String getServiceName() {
+        return MasterAdminService.getDescriptor().getName();
+      }
+
+      @Override
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+      MasterAdminService.BlockingInterface makeStub() throws MasterNotRunningException {
+        return (MasterAdminService.BlockingInterface)super.makeStub();
+      }
+
+      @Override
+      protected Object makeStub(BlockingRpcChannel channel) {
+        this.stub = MasterAdminService.newBlockingStub(channel);
+        return this.stub;
+      }
+
+      @Override
+      protected void isMasterRunning() throws ServiceException {
+        this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+      }
+    };
+
     @Override
-    public AdminProtocol getAdmin(final ServerName serverName, final boolean master)
+    public AdminService.BlockingInterface getAdmin(final ServerName serverName)
         throws IOException {
-      if (isDeadServer(serverName)){
-        throw new RegionServerStoppedException("The server " + serverName + " is dead.");
+      return getAdmin(serverName, false);
+    }
+
+    @Override
+    // Nothing is done w/ the 'master' parameter.  It is ignored.
+    public AdminService.BlockingInterface getAdmin(final ServerName serverName,
+      final boolean master)
+    throws IOException {
+      if (isDeadServer(serverName)) {
+        throw new RegionServerStoppedException(serverName + " is dead.");
+      }
+      String key = getStubKey(AdminService.BlockingInterface.class.getName(),
+        serverName.getHostAndPort());
+      this.connectionLock.putIfAbsent(key, key);
+      AdminService.BlockingInterface stub = null;
+      synchronized (this.connectionLock.get(key)) {
+        stub = (AdminService.BlockingInterface)this.stubs.get(key);
+        if (stub == null) {
+          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
+            User.getCurrent(), this.rpcTimeout);
+          stub = AdminService.newBlockingStub(channel);
+          this.stubs.put(key, stub);
+        }
       }
-      return (AdminProtocol)getProtocol(
-          serverName.getHostname(), serverName.getPort(), adminClass);
+      return stub;
     }
 
-    /**
-     * Either the passed <code>isa</code> is null or <code>hostname</code>
-     * can be but not both.
-     * @param hostname
-     * @param port
-     * @param protocolClass
-     * @return Proxy.
-     * @throws IOException
-     */
-    IpcProtocol getProtocol(final String hostname,
-        final int port, final Class <? extends IpcProtocol> protocolClass)
+    @Override
+    public ClientService.BlockingInterface getClient(final ServerName sn)
     throws IOException {
-      String rsName = Addressing.createHostAndPortStr(hostname, port);
-      // See if we already have a connection (common case)
-      Map<String, IpcProtocol> protocols = this.servers.get(rsName);
-      if (protocols == null) {
-        protocols = new HashMap<String, IpcProtocol>();
-        Map<String, IpcProtocol> existingProtocols =
-          this.servers.putIfAbsent(rsName, protocols);
-        if (existingProtocols != null) {
-          protocols = existingProtocols;
-        }
-      }
-      String protocol = protocolClass.getName();
-      IpcProtocol server = protocols.get(protocol);
-      if (server == null) {
-        // create a unique lock for this RS + protocol (if necessary)
-        String lockKey = protocol + "@" + rsName;
-        this.connectionLock.putIfAbsent(lockKey, lockKey);
-        // get the RS lock
-        synchronized (this.connectionLock.get(lockKey)) {
-          // do one more lookup in case we were stalled above
-          server = protocols.get(protocol);
-          if (server == null) {
-            try {
-              // Only create isa when we need to.
-              InetSocketAddress address = new InetSocketAddress(hostname, port);
-              // definitely a cache miss. establish an RPC for this RS
-              server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
-                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
-              protocols.put(protocol, server);
-            } catch (RemoteException e) {
-              LOG.warn("RemoteException connecting to RS", e);
-              // Throw what the RemoteException was carrying.
-              throw e.unwrapRemoteException();
-            }
-          }
+      if (isDeadServer(sn)) {
+        throw new RegionServerStoppedException(sn + " is dead.");
+      }
+      String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
+      this.connectionLock.putIfAbsent(key, key);
+      ClientService.BlockingInterface stub = null;
+      synchronized (this.connectionLock.get(key)) {
+        stub = (ClientService.BlockingInterface)this.stubs.get(key);
+        if (stub == null) {
+          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
+          User.getCurrent(), this.rpcTimeout);
+          stub = ClientService.newBlockingStub(channel);
+          // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
+          // Just fail on first actual call rather than in here on setup.
+          this.stubs.put(key, stub);
         }
       }
-      return server;
+      return stub;
+    }
+
+    static String getStubKey(final String serviceName, final String rsHostnamePort) {
+      return serviceName + "@" + rsHostnamePort;
     }
 
     @Override
@@ -1570,14 +1572,12 @@ public class HConnectionManager {
     private static final long keepAlive = 5 * 60 * 1000;
 
     /**
-     * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have
-     *  finished with it.
+     * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
      * @return The shared instance. Never returns null.
      */
     public ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
       throws IOException {
       synchronized (masterAndZKLock) {
-
         if (keepAliveZookeeper == null) {
           // We don't check that our link to ZooKeeper is still valid
           // But there is a retry mechanism in the ZooKeeperWatcher itself
@@ -1586,7 +1586,6 @@ public class HConnectionManager {
         }
         keepAliveZookeeperUserCount++;
         keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
-
         return keepAliveZookeeper;
       }
     }
@@ -1608,7 +1607,7 @@ public class HConnectionManager {
     /**
      * Creates a Chore thread to check the connections to master & zookeeper
      *  and close them when they reach their closing time (
-     *  {@link MasterProtocolState#keepAliveUntil} and
+     *  {@link MasterServiceState#keepAliveUntil} and
      *  {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
      *  managed by the release functions and the variable {@link #keepAlive}
      */
@@ -1636,9 +1635,9 @@ public class HConnectionManager {
         return new DelayedClosing(hci, stoppable);
       }
 
-      protected void closeMasterProtocol(MasterProtocolState protocolState) {
+      protected void closeMasterProtocol(MasterServiceState protocolState) {
         if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
-          hci.closeMasterProtocol(protocolState);
+          hci.closeMasterService(protocolState);
           protocolState.keepAliveUntil = Long.MAX_VALUE;
         }
       }
@@ -1654,8 +1653,8 @@ public class HConnectionManager {
               hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
             }
           }
-          closeMasterProtocol(hci.masterAdminProtocol);
-          closeMasterProtocol(hci.masterMonitorProtocol);
+          closeMasterProtocol(hci.adminMasterServiceState);
+          closeMasterProtocol(hci.monitorMasterServiceState);
         }
       }
 
@@ -1683,117 +1682,289 @@ public class HConnectionManager {
       }
     }
 
-    private static class MasterProtocolHandler implements InvocationHandler {
-      private HConnectionImplementation connection;
-      private MasterProtocolState protocolStateTracker;
+    final MasterAdminServiceState adminMasterServiceState = new MasterAdminServiceState(this);
+    final MasterMonitorServiceState monitorMasterServiceState =
+      new MasterMonitorServiceState(this);
 
-      protected MasterProtocolHandler(HConnectionImplementation connection,
-                                    MasterProtocolState protocolStateTracker) {
-        this.connection = connection;
-        this.protocolStateTracker = protocolStateTracker;
-      }
-
-      @Override
-      public Object invoke(Object proxy, Method method, Object[] args)
-        throws Throwable {
-        if (method.getName().equals("close") &&
-              method.getParameterTypes().length == 0) {
-          release(connection, protocolStateTracker);
-          return null;
-        } else {
-          try {
-            return method.invoke(protocolStateTracker.protocol, args);
-          }catch  (InvocationTargetException e){
-            // We will have this for all the exception, checked on not, sent
-            //  by any layer, including the functional exception
-            Throwable cause = e.getCause();
-            if (cause == null){
-              throw new RuntimeException(
-                "Proxy invocation failed and getCause is null", e);
-            }
-            if (cause instanceof UndeclaredThrowableException) {
-              cause = cause.getCause();
-            }
-            throw cause;
-          }
-        }
-      }
+    @Override
+    public MasterAdminService.BlockingInterface getMasterAdmin() throws MasterNotRunningException {
+      return getKeepAliveMasterAdminService();
+    }
 
-      private void release(
-        HConnectionImplementation connection,
-        MasterProtocolState target) {
-        connection.releaseMaster(target);
-      }
+    @Override
+    public MasterMonitorService.BlockingInterface getMasterMonitor()
+    throws MasterNotRunningException {
+      return getKeepAliveMasterMonitorService();
     }
 
-    MasterProtocolState masterAdminProtocol =
-      new MasterProtocolState(MasterAdminProtocol.class);
-    MasterProtocolState masterMonitorProtocol =
-      new MasterProtocolState(MasterMonitorProtocol.class);
+    private void resetMasterServiceState(final MasterServiceState mss) {
+      mss.userCount++;
+      mss.keepAliveUntil = Long.MAX_VALUE;
+    }
 
-    /**
-     * This function allows HBaseAdmin and potentially others
-     * to get a shared master connection.
-     *
-     * @return The shared instance. Never returns null.
-     * @throws MasterNotRunningException
-     */
-    private Object getKeepAliveMasterProtocol(
-        MasterProtocolState protocolState, Class connectionClass)
-        throws MasterNotRunningException {
+    @Override
+    public MasterAdminKeepAliveConnection getKeepAliveMasterAdminService()
+    throws MasterNotRunningException {
       synchronized (masterAndZKLock) {
-        if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
-          protocolState.protocol = null;
-          protocolState.protocol = createMasterWithRetries(protocolState);
+        if (!isKeepAliveMasterConnectedAndRunning(this.adminMasterServiceState)) {
+          MasterAdminServiceStubMaker stubMaker = new MasterAdminServiceStubMaker();
+          this.adminMasterServiceState.stub = stubMaker.makeStub();
+        }
+        resetMasterServiceState(this.adminMasterServiceState);
+      }
+      // Ugly delegation just so we can add in a Close method.
+      final MasterAdminService.BlockingInterface stub = this.adminMasterServiceState.stub;
+      return new MasterAdminKeepAliveConnection() {
+        MasterAdminServiceState mss = adminMasterServiceState;
+        @Override
+        public AddColumnResponse addColumn(RpcController controller,
+            AddColumnRequest request) throws ServiceException {
+          return stub.addColumn(controller, request);
         }
-        protocolState.userCount++;
-        protocolState.keepAliveUntil = Long.MAX_VALUE;
 
-        return Proxy.newProxyInstance(
-          connectionClass.getClassLoader(),
-          new Class[]{connectionClass},
-          new MasterProtocolHandler(this, protocolState)
-        );
-      }
-    }
+        @Override
+        public DeleteColumnResponse deleteColumn(RpcController controller,
+            DeleteColumnRequest request) throws ServiceException {
+          return stub.deleteColumn(controller, request);
+        }
 
-    @Override
-    public MasterAdminProtocol getMasterAdmin() throws MasterNotRunningException {
-      return getKeepAliveMasterAdmin();
-    }
+        @Override
+        public ModifyColumnResponse modifyColumn(RpcController controller,
+            ModifyColumnRequest request) throws ServiceException {
+          return stub.modifyColumn(controller, request);
+        }
 
-    @Override
-    public MasterMonitorProtocol getMasterMonitor() throws MasterNotRunningException {
-      return getKeepAliveMasterMonitor();
+        @Override
+        public MoveRegionResponse moveRegion(RpcController controller,
+            MoveRegionRequest request) throws ServiceException {
+          return stub.moveRegion(controller, request);
+        }
+
+        @Override
+        public DispatchMergingRegionsResponse dispatchMergingRegions(
+            RpcController controller, DispatchMergingRegionsRequest request)
+            throws ServiceException {
+          return stub.dispatchMergingRegions(controller, request);
+        }
+
+        @Override
+        public AssignRegionResponse assignRegion(RpcController controller,
+            AssignRegionRequest request) throws ServiceException {
+          return stub.assignRegion(controller, request);
+        }
+
+        @Override
+        public UnassignRegionResponse unassignRegion(RpcController controller,
+            UnassignRegionRequest request) throws ServiceException {
+          return stub.unassignRegion(controller, request);
+        }
+
+        @Override
+        public OfflineRegionResponse offlineRegion(RpcController controller,
+            OfflineRegionRequest request) throws ServiceException {
+          return stub.offlineRegion(controller, request);
+        }
+
+        @Override
+        public DeleteTableResponse deleteTable(RpcController controller,
+            DeleteTableRequest request) throws ServiceException {
+          return stub.deleteTable(controller, request);
+        }
+
+        @Override
+        public EnableTableResponse enableTable(RpcController controller,
+            EnableTableRequest request) throws ServiceException {
+          return stub.enableTable(controller, request);
+        }
+
+        @Override
+        public DisableTableResponse disableTable(RpcController controller,
+            DisableTableRequest request) throws ServiceException {
+          return stub.disableTable(controller, request);
+        }
+
+        @Override
+        public ModifyTableResponse modifyTable(RpcController controller,
+            ModifyTableRequest request) throws ServiceException {
+          return stub.modifyTable(controller, request);
+        }
+
+        @Override
+        public CreateTableResponse createTable(RpcController controller,
+            CreateTableRequest request) throws ServiceException {
+          return stub.createTable(controller, request);
+        }
+
+        @Override
+        public ShutdownResponse shutdown(RpcController controller,
+            ShutdownRequest request) throws ServiceException {
+          return stub.shutdown(controller, request);
+        }
+
+        @Override
+        public StopMasterResponse stopMaster(RpcController controller,
+            StopMasterRequest request) throws ServiceException {
+          return stub.stopMaster(controller, request);
+        }
+
+        @Override
+        public BalanceResponse balance(RpcController controller,
+            BalanceRequest request) throws ServiceException {
+          return stub.balance(controller, request);
+        }
+
+        @Override
+        public SetBalancerRunningResponse setBalancerRunning(
+            RpcController controller, SetBalancerRunningRequest request)
+            throws ServiceException {
+          return stub.setBalancerRunning(controller, request);
+        }
+
+        @Override
+        public CatalogScanResponse runCatalogScan(RpcController controller,
+            CatalogScanRequest request) throws ServiceException {
+          return stub.runCatalogScan(controller, request);
+        }
+
+        @Override
+        public EnableCatalogJanitorResponse enableCatalogJanitor(
+            RpcController controller, EnableCatalogJanitorRequest request)
+            throws ServiceException {
+          return stub.enableCatalogJanitor(controller, request);
+        }
+
+        @Override
+        public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
+            RpcController controller, IsCatalogJanitorEnabledRequest request)
+            throws ServiceException {
+          return stub.isCatalogJanitorEnabled(controller, request);
+        }
+
+        @Override
+        public CoprocessorServiceResponse execMasterService(
+            RpcController controller, CoprocessorServiceRequest request)
+            throws ServiceException {
+          return stub.execMasterService(controller, request);
+        }
+
+        @Override
+        public TakeSnapshotResponse snapshot(RpcController controller,
+            TakeSnapshotRequest request) throws ServiceException {
+          return stub.snapshot(controller, request);
+        }
+
+        @Override
+        public ListSnapshotResponse getCompletedSnapshots(
+            RpcController controller, ListSnapshotRequest request)
+            throws ServiceException {
+          return stub.getCompletedSnapshots(controller, request);
+        }
+
+        @Override
+        public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
+            DeleteSnapshotRequest request) throws ServiceException {
+          return stub.deleteSnapshot(controller, request);
+        }
+
+        @Override
+        public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
+            IsSnapshotDoneRequest request) throws ServiceException {
+          return stub.isSnapshotDone(controller, request);
+        }
+
+        @Override
+        public RestoreSnapshotResponse restoreSnapshot(
+            RpcController controller, RestoreSnapshotRequest request)
+            throws ServiceException {
+          return stub.restoreSnapshot(controller, request);
+        }
+
+        @Override
+        public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
+            RpcController controller, IsRestoreSnapshotDoneRequest request)
+            throws ServiceException {
+          return stub.isRestoreSnapshotDone(controller, request);
+        }
+
+        @Override
+        public IsMasterRunningResponse isMasterRunning(
+            RpcController controller, IsMasterRunningRequest request)
+            throws ServiceException {
+          return stub.isMasterRunning(controller, request);
+        }
+
+        @Override
+        public void close() {
+          release(this.mss);
+        }
+      };
     }
 
-    @Override
-    public MasterAdminKeepAliveConnection getKeepAliveMasterAdmin()
-        throws MasterNotRunningException {
-      return (MasterAdminKeepAliveConnection)
-        getKeepAliveMasterProtocol(masterAdminProtocol, MasterAdminKeepAliveConnection.class);
+    private static void release(MasterServiceState mss) {
+      if (mss != null && mss.connection != null) {
+        ((HConnectionImplementation)mss.connection).releaseMaster(mss);
+      }
     }
 
     @Override
-    public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitor()
-        throws MasterNotRunningException {
-      return (MasterMonitorKeepAliveConnection)
-        getKeepAliveMasterProtocol(masterMonitorProtocol, MasterMonitorKeepAliveConnection.class);
+    public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitorService()
+    throws MasterNotRunningException {
+      synchronized (masterAndZKLock) {
+        if (!isKeepAliveMasterConnectedAndRunning(this.monitorMasterServiceState)) {
+          MasterMonitorServiceStubMaker stubMaker = new MasterMonitorServiceStubMaker();
+          this.monitorMasterServiceState.stub = stubMaker.makeStub();
+        }
+        resetMasterServiceState(this.monitorMasterServiceState);
+      }
+      // Ugly delegation just so can implement close
+      final MasterMonitorService.BlockingInterface stub = this.monitorMasterServiceState.stub;
+      return new MasterMonitorKeepAliveConnection() {
+        final MasterMonitorServiceState mss = monitorMasterServiceState;
+        @Override
+        public GetSchemaAlterStatusResponse getSchemaAlterStatus(
+            RpcController controller, GetSchemaAlterStatusRequest request)
+            throws ServiceException {
+          return stub.getSchemaAlterStatus(controller, request);
+        }
+
+        @Override
+        public GetTableDescriptorsResponse getTableDescriptors(
+            RpcController controller, GetTableDescriptorsRequest request)
+            throws ServiceException {
+          return stub.getTableDescriptors(controller, request);
+        }
+
+        @Override
+        public GetClusterStatusResponse getClusterStatus(
+            RpcController controller, GetClusterStatusRequest request)
+            throws ServiceException {
+          return stub.getClusterStatus(controller, request);
+        }
+
+        @Override
+        public IsMasterRunningResponse isMasterRunning(
+            RpcController controller, IsMasterRunningRequest request)
+            throws ServiceException {
+          return stub.isMasterRunning(controller, request);
+        }
+
+        @Override
+        public void close() throws IOException {
+          release(this.mss);
+        }
+      };
     }
 
-    private boolean isKeepAliveMasterConnectedAndRunning(MasterProtocolState protocolState){
-      if (protocolState.protocol == null){
+    private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
+      if (mss.getStub() == null){
         return false;
       }
       try {
-         return protocolState.protocol.isMasterRunning(
-           null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning();
-      }catch (UndeclaredThrowableException e){
+        return mss.isMasterRunning();
+      } catch (UndeclaredThrowableException e) {
         // It's somehow messy, but we can receive exceptions such as
-        //  java.net.ConnectException but they're not declared. So we catch
-        //  it...
-        LOG.info("Master connection is not running anymore",
-          e.getUndeclaredThrowable());
+        //  java.net.ConnectException but they're not declared. So we catch it...
+        LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
         return false;
       } catch (ServiceException se) {
         LOG.warn("Checking master connection", se);
@@ -1801,35 +1972,32 @@ public class HConnectionManager {
       }
     }
 
-   private void releaseMaster(MasterProtocolState protocolState) {
-      if (protocolState.protocol == null){
-        return;
-      }
+    void releaseMaster(MasterServiceState mss) {
+      if (mss.getStub() == null) return;
       synchronized (masterAndZKLock) {
-        --protocolState.userCount;
-        if (protocolState.userCount <= 0) {
-          protocolState.keepAliveUntil =
-            System.currentTimeMillis() + keepAlive;
+        --mss.userCount;
+        if (mss.userCount <= 0) {
+          mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
         }
       }
     }
 
-    private void closeMasterProtocol(MasterProtocolState protocolState) {
-      if (protocolState.protocol != null){
-        LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
-        protocolState.protocol = null;
+    private void closeMasterService(MasterServiceState mss) {
+      if (mss.getStub() != null) {
+        LOG.info("Closing master protocol: " + mss);
+        mss.clearStub();
       }
-      protocolState.userCount = 0;
+      mss.userCount = 0;
     }
 
     /**
-     * Immediate close of the shared master. Can be by the delayed close or
-     *  when closing the connection itself.
+     * Immediate close of the shared master. Can be by the delayed close or when closing the
+     * connection itself.
      */
     private void closeMaster() {
       synchronized (masterAndZKLock) {
-        closeMasterProtocol(masterAdminProtocol);
-        closeMasterProtocol(masterMonitorProtocol);
+        closeMasterService(adminMasterServiceState);
+        closeMasterService(monitorMasterServiceState);
       }
     }
 
@@ -2473,8 +2641,7 @@ public class HConnectionManager {
       delayedClosing.stop("Closing connection");
       closeMaster();
       closeZooKeeperWatcher();
-      this.servers.clear();
-      this.rpcEngine.close();
+      this.stubs.clear();
       if (clusterStatusListener != null) {
         clusterStatusListener.close();
       }
@@ -2515,7 +2682,7 @@ public class HConnectionManager {
 
     @Override
     public HTableDescriptor[] listTables() throws IOException {
-      MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor();
+      MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService();
       try {
         GetTableDescriptorsRequest req =
           RequestConverter.buildGetTableDescriptorsRequest(null);
@@ -2530,7 +2697,7 @@ public class HConnectionManager {
     @Override
     public HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws IOException {
       if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
-      MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor();
+      MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService();
       try {
         GetTableDescriptorsRequest req =
           RequestConverter.buildGetTableDescriptorsRequest(tableNames);
@@ -2556,7 +2723,7 @@ public class HConnectionManager {
       if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
         return HTableDescriptor.META_TABLEDESC;
       }
-      MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitor();
+      MasterMonitorKeepAliveConnection master = getKeepAliveMasterMonitorService();
       GetTableDescriptorsResponse htds;
       try {
         GetTableDescriptorsRequest req =
@@ -2576,14 +2743,6 @@ public class HConnectionManager {
     }
 
     /**
-     * Override the RpcClientEngine implementation used by this connection.
-     * <strong>FOR TESTING PURPOSES ONLY!</strong>
-     */
-    void setRpcEngine(RpcClientEngine engine) {
-      this.rpcEngine = engine;
-    }
-
-    /**
      * The record of errors for servers. Visible for testing.
      */
     @VisibleForTesting
@@ -2685,17 +2844,15 @@ public class HConnectionManager {
    * @param c The Configuration instance to set the retries into.
    * @param log Used to log what we set in here.
    */
-  public static void setServerSideHConnectionRetries(final Configuration c,
+  public static void setServerSideHConnectionRetries(final Configuration c, final String sn,
       final Log log) {
     int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     // Go big.  Multiply by 10.  If we can't get to meta after this many retries
     // then something seriously wrong.
-    int serversideMultiplier =
-      c.getInt("hbase.client.serverside.retries.multiplier", 10);
+    int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
     int retries = hcRetries * serversideMultiplier;
     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
-    log.debug("HConnection retries=" + retries);
+    log.debug(sn + " HConnection server-to-server retries=" + retries);
   }
 }
-