You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2013/01/19 00:32:40 UTC

svn commit: r1435414 - in /hbase/trunk/hbase-server/src: main/jamon/org/apache/hadoop/hbase/tmpl/master/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apac...

Author: garyh
Date: Fri Jan 18 23:32:39 2013
New Revision: 1435414

URL: http://svn.apache.org/viewvc?rev=1435414&view=rev
Log:
HBASE-7460  Cleanup RPC client connection layers

Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
Modified:
    hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/trunk/hbase-server/src/main/resources/hbase-default.xml
    hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java

Modified: hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon (original)
+++ hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon Fri Jan 18 23:32:39 2013
@@ -283,7 +283,7 @@ org.apache.hadoop.hbase.HBaseConfigurati
 <%def userTables>
 <%java>
    HTableDescriptor[] tables = admin.listTables();
-   HConnectionManager.deleteConnection(admin.getConfiguration(), false);
+   HConnectionManager.deleteConnection(admin.getConfiguration());
 </%java>
 <%if (tables != null && tables.length > 0)%>
 <table class="table table-striped">

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Jan 18 23:32:39 2013
@@ -75,6 +75,8 @@ import org.apache.hadoop.hbase.client.Me
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClientEngine;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
@@ -117,7 +119,7 @@ import com.google.protobuf.ServiceExcept
  * <p>But sharing connections
  * makes clean up of {@link HConnection} instances a little awkward.  Currently,
  * clients cleanup by calling
- * {@link #deleteConnection(Configuration, boolean)}.  This will shutdown the
+ * {@link #deleteConnection(Configuration)}.  This will shutdown the
  * zookeeper connection the HConnection was using and clean up all
  * HConnection resources as well as stopping proxies to servers out on the
  * cluster. Not running the cleanup will not end the world; it'll
@@ -138,7 +140,7 @@ import com.google.protobuf.ServiceExcept
  * }
  * </pre>
  * <p>Cleanup used to be done inside in a shutdown hook.  On startup we'd
- * register a shutdown hook that called {@link #deleteAllConnections(boolean)}
+ * register a shutdown hook that called {@link #deleteAllConnections()}
  * on its way out but the order in which shutdown hooks run is not defined so
  * were problematic for clients of HConnection that wanted to register their
  * own shutdown hooks so we removed ours though this shifts the onus for
@@ -212,7 +214,7 @@ public class HConnectionManager {
         connection = new HConnectionImplementation(conf, true);
         HBASE_INSTANCES.put(connectionKey, connection);
       } else if (connection.isClosed()) {
-        HConnectionManager.deleteConnection(connectionKey, true, true);
+        HConnectionManager.deleteConnection(connectionKey, true);
         connection = new HConnectionImplementation(conf, true);
         HBASE_INSTANCES.put(connectionKey, connection);
       }
@@ -244,14 +246,9 @@ public class HConnectionManager {
    * @param conf
    *          configuration whose identity is used to find {@link HConnection}
    *          instance.
-   * @param stopProxy
-   *          Shuts down all the proxy's put up to cluster members including to
-   *          cluster HMaster. Calls
-   *          {@link HBaseClientRPC#stopProxy(IpcProtocol)}
-   *          .
    */
-  public static void deleteConnection(Configuration conf, boolean stopProxy) {
-    deleteConnection(new HConnectionKey(conf), stopProxy, false);
+  public static void deleteConnection(Configuration conf) {
+    deleteConnection(new HConnectionKey(conf), false);
   }
 
   /**
@@ -262,40 +259,37 @@ public class HConnectionManager {
    * @param connection
    */
   public static void deleteStaleConnection(HConnection connection) {
-    deleteConnection(connection, true, true);
+    deleteConnection(connection, true);
   }
 
   /**
    * Delete information for all connections.
-   * @param stopProxy stop the proxy as well
    * @throws IOException
    */
-  public static void deleteAllConnections(boolean stopProxy) {
+  public static void deleteAllConnections() {
     synchronized (HBASE_INSTANCES) {
       Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
       connectionKeys.addAll(HBASE_INSTANCES.keySet());
       for (HConnectionKey connectionKey : connectionKeys) {
-        deleteConnection(connectionKey, stopProxy, false);
+        deleteConnection(connectionKey, false);
       }
       HBASE_INSTANCES.clear();
     }
   }
 
-  private static void deleteConnection(HConnection connection, boolean stopProxy,
-      boolean staleConnection) {
+  private static void deleteConnection(HConnection connection, boolean staleConnection) {
     synchronized (HBASE_INSTANCES) {
       for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
           .entrySet()) {
         if (connectionEntry.getValue() == connection) {
-          deleteConnection(connectionEntry.getKey(), stopProxy, staleConnection);
+          deleteConnection(connectionEntry.getKey(), staleConnection);
           break;
         }
       }
     }
   }
 
-  private static void deleteConnection(HConnectionKey connectionKey,
-      boolean stopProxy, boolean staleConnection) {
+  private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
     synchronized (HBASE_INSTANCES) {
       HConnectionImplementation connection = HBASE_INSTANCES
           .get(connectionKey);
@@ -303,11 +297,9 @@ public class HConnectionManager {
         connection.decCount();
         if (connection.isZeroReference() || staleConnection) {
           HBASE_INSTANCES.remove(connectionKey);
-          connection.close(stopProxy);
-        } else if (stopProxy) {
-          connection.stopProxyOnClose(stopProxy);
+          connection.internalClose();
         }
-      }else {
+      } else {
         LOG.error("Connection not found in the list, can't delete it "+
           "(connection key="+connectionKey+"). May be the key was modified?");
       }
@@ -549,6 +541,9 @@ public class HConnectionManager {
 
     private final Configuration conf;
 
+    // client RPC
+    private RpcClientEngine rpcEngine;
+
     // Known region ServerName.toString() -> RegionClient/Admin
     private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
       new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
@@ -575,7 +570,6 @@ public class HConnectionManager {
     private final Set<Integer> regionCachePrefetchDisabledTables =
       new CopyOnWriteArraySet<Integer>();
 
-    private boolean stopProxy;
     private int refCount;
 
     // indicates whether this connection's life cycle is managed (by us)
@@ -589,6 +583,9 @@ public class HConnectionManager {
     throws ZooKeeperConnectionException {
       this.conf = conf;
       this.managed = managed;
+      // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
+      // but we maintain access through an interface to allow overriding for tests
+      this.rpcEngine = new ProtobufRpcClientEngine(conf);
       String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
         DEFAULT_ADMIN_PROTOCOL_CLASS);
       this.closed = false;
@@ -716,7 +713,7 @@ public class HConnectionManager {
 
         InetSocketAddress isa =
           new InetSocketAddress(sn.getHostname(), sn.getPort());
-        MasterProtocol tryMaster = (MasterProtocol)HBaseClientRPC.getProxy(
+        MasterProtocol tryMaster = rpcEngine.getProxy(
             masterProtocolState.protocolClass,
             isa, this.conf, this.rpcTimeout);
 
@@ -724,7 +721,6 @@ public class HConnectionManager {
             null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
           return tryMaster;
         } else {
-          HBaseClientRPC.stopProxy(tryMaster);
           String msg = "Can create a proxy to master, but it is not running";
           LOG.info(msg);
           throw new MasterNotRunningException(msg);
@@ -897,7 +893,7 @@ public class HConnectionManager {
     @Override
     public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
       return locateRegion(HRegionInfo.getTableName(regionName),
-        HRegionInfo.getStartKey(regionName), false, true);
+          HRegionInfo.getStartKey(regionName), false, true);
     }
 
     @Override
@@ -1364,7 +1360,6 @@ public class HConnectionManager {
      * @param hostname
      * @param port
      * @param protocolClass
-     * @param version
      * @return Proxy.
      * @throws IOException
      */
@@ -1397,8 +1392,7 @@ public class HConnectionManager {
               // Only create isa when we need to.
               InetSocketAddress address = new InetSocketAddress(hostname, port);
               // definitely a cache miss. establish an RPC for this RS
-              server = HBaseClientRPC.waitForProxy(
-                  protocolClass, address, this.conf,
+              server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
                   this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
               protocols.put(protocol, server);
             } catch (RemoteException e) {
@@ -1611,9 +1605,6 @@ public class HConnectionManager {
         throws MasterNotRunningException {
       synchronized (masterAndZKLock) {
         if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
-          if (protocolState.protocol != null) {
-            HBaseClientRPC.stopProxy(protocolState.protocol);
-          }
           protocolState.protocol = null;
           protocolState.protocol = createMasterWithRetries(protocolState);
         }
@@ -1688,7 +1679,6 @@ public class HConnectionManager {
     private void closeMasterProtocol(MasterProtocolState protocolState) {
       if (protocolState.protocol != null){
         LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
-        HBaseClientRPC.stopProxy(protocolState.protocol);
         protocolState.protocol = null;
       }
       protocolState.userCount = 0;
@@ -2276,10 +2266,6 @@ public class HConnectionManager {
       }
     }
 
-    public void stopProxyOnClose(boolean stopProxy) {
-      this.stopProxy = stopProxy;
-    }
-
     /**
      * Increment this client's reference count.
      */
@@ -2305,21 +2291,15 @@ public class HConnectionManager {
       return refCount == 0;
     }
 
-    void close(boolean stopProxy) {
+    void internalClose() {
       if (this.closed) {
         return;
       }
       delayedClosing.stop("Closing connection");
-      if (stopProxy) {
-        closeMaster();
-        for (Map<String, IpcProtocol> i : servers.values()) {
-          for (IpcProtocol server: i.values()) {
-            HBaseClientRPC.stopProxy(server);
-          }
-        }
-      }
+      closeMaster();
       closeZooKeeperWatcher();
       this.servers.clear();
+      this.rpcEngine.close();
       this.closed = true;
     }
 
@@ -2329,10 +2309,10 @@ public class HConnectionManager {
         if (aborted) {
           HConnectionManager.deleteStaleConnection(this);
         } else {
-          HConnectionManager.deleteConnection(this, stopProxy, false);
+          HConnectionManager.deleteConnection(this, false);
         }
       } else {
-        close(true);
+        internalClose();
       }
     }
 
@@ -2419,6 +2399,14 @@ public class HConnectionManager {
       }
       throw new TableNotFoundException(Bytes.toString(tableName));
     }
+
+    /**
+     * Override the RpcClientEngine implementation used by this connection.
+     * <strong>FOR TESTING PURPOSES ONLY!</strong>
+     */
+    void setRpcEngine(RpcClientEngine engine) {
+      this.rpcEngine = engine;
+    }
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri Jan 18 23:32:39 2013
@@ -126,7 +126,6 @@ public class HBaseClient {
   protected FailedServers failedServers;
 
   protected final SocketFactory socketFactory;           // how to create sockets
-  private int refCount = 1;
   protected String clusterId;
 
   final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -235,31 +234,6 @@ public class HBaseClient {
     return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
   }
 
-  /**
-   * Increment this client's reference count
-   *
-   */
-  synchronized void incCount() {
-    refCount++;
-  }
-
-  /**
-   * Decrement this client's reference count
-   *
-   */
-  synchronized void decCount() {
-    refCount--;
-  }
-
-  /**
-   * Return if this client has no reference
-   *
-   * @return true if this client has no reference; false otherwise
-   */
-  synchronized boolean isZeroReference() {
-    return refCount==0;
-  }
-
   /** A call waiting for a value. */
   protected class Call {
     final int id;                                 // call id

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java Fri Jan 18 23:32:39 2013
@@ -48,22 +48,6 @@ public class HBaseClientRPC {
   protected static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC");
 
-  /**
-   * Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine}
-   * implementation to load to handle connection protocols.  Handlers for individual
-   * protocols can be configured using {@code "hbase.rpc.client.engine." +
-   * protocol.class.name}.
-   */
-  public static final String RPC_ENGINE_PROP = "hbase.rpc.client.engine";
-
-  // cache of RpcEngines by protocol
-  private static final Map<Class<? extends IpcProtocol>, RpcClientEngine> PROTOCOL_ENGINES  =
-    new HashMap<Class<? extends IpcProtocol>, RpcClientEngine>();
-
-  // Track what RpcEngine is used by a proxy class, for stopProxy()
-  private static final Map<Class<?>, RpcClientEngine> PROXY_ENGINES =
-    new HashMap<Class<?>, RpcClientEngine>();
-
   // thread-specific RPC timeout, which may override that of RpcEngine
   private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
     @Override
@@ -72,41 +56,6 @@ public class HBaseClientRPC {
     }
   };
 
-  // set a protocol to use a non-default RpcEngine
-  static void setProtocolEngine(Configuration conf,
-      Class<? extends IpcProtocol> protocol, Class<? extends RpcClientEngine> engine) {
-    conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine,
-      RpcClientEngine.class);
-  }
-
-  // return the RpcEngine configured to handle a protocol
-  static synchronized RpcClientEngine getProtocolEngine(
-      Class<? extends IpcProtocol> protocol, Configuration conf) {
-    RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol);
-    if (engine == null) {
-      // check for a configured default engine
-      Class<?> defaultEngine =
-        conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class);
-
-      // check for a per interface override
-      Class<?> impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(),
-        defaultEngine);
-      LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
-      engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf);
-      if (protocol.isInterface()) {
-        PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol),
-          engine);
-      }
-      PROTOCOL_ENGINES.put(protocol, engine);
-    }
-    return engine;
-  }
-
-  // return the RpcEngine that handles a proxy object
-  private static synchronized RpcClientEngine getProxyEngine(Object proxy) {
-    return PROXY_ENGINES.get(proxy.getClass());
-  }
-
   /**
    * @param protocol      protocol interface
    * @param addr          address of remote service
@@ -117,12 +66,13 @@ public class HBaseClientRPC {
    * @return proxy
    * @throws java.io.IOException e
    */
-  public static IpcProtocol waitForProxy(Class<? extends IpcProtocol> protocol,
-                                               InetSocketAddress addr,
-                                               Configuration conf,
-                                               int maxAttempts,
-                                               int rpcTimeout,
-                                               long timeout)
+  public static <T extends IpcProtocol> T waitForProxy(RpcClientEngine engine,
+                                         Class<T> protocol,
+                                         InetSocketAddress addr,
+                                         Configuration conf,
+                                         int maxAttempts,
+                                         int rpcTimeout,
+                                         long timeout)
   throws IOException {
     // HBase does limited number of reconnects which is different from hadoop.
     long startTime = System.currentTimeMillis();
@@ -130,7 +80,7 @@ public class HBaseClientRPC {
     int reconnectAttempts = 0;
     while (true) {
       try {
-        return getProxy(protocol, addr, conf, rpcTimeout);
+        return engine.getProxy(protocol, addr, conf, rpcTimeout);
       } catch (SocketTimeoutException te) {
        LOG.info("Problem connecting to server: " + addr);
         ioe = te;
@@ -194,77 +144,6 @@ public class HBaseClientRPC {
     }
   }
 
-  /**
-   * Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address.
-   *
-   * @param protocol      interface
-   * @param addr          remote address
-   * @param conf          configuration
-   * @param factory       socket factory
-   * @param rpcTimeout    timeout for each RPC
-   * @return proxy
-   * @throws java.io.IOException e
-   */
-  public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
-                                           InetSocketAddress addr,
-                                           Configuration conf,
-                                           SocketFactory factory,
-                                           int rpcTimeout) throws IOException {
-    return getProxy(protocol, addr, User.getCurrent(), conf, factory, rpcTimeout);
-  }
-
-  /**
-   * Construct a client-side proxy object that implements the named protocol,
-   * talking to a server at the named address.
-   *
-   * @param protocol      interface
-   * @param addr          remote address
-   * @param ticket        ticket
-   * @param conf          configuration
-   * @param factory       socket factory
-   * @param rpcTimeout    timeout for each RPC
-   * @return proxy
-   * @throws java.io.IOException e
-   */
-  public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
-      InetSocketAddress addr, User ticket,
-      Configuration conf, SocketFactory factory, int rpcTimeout)
-      throws IOException {
-    RpcClientEngine engine = getProtocolEngine(protocol, conf);
-    IpcProtocol proxy = engine.getProxy(protocol, addr, ticket, conf, factory,
-      Math.min(rpcTimeout, getRpcTimeout()));
-    return proxy;
-  }
-
-  /**
-   * Construct a client-side proxy object with the default SocketFactory
-   *
-   * @param protocol      interface
-   * @param addr          remote address
-   * @param conf          configuration
-   * @param rpcTimeout    timeout for each RPC
-   * @return a proxy instance
-   * @throws java.io.IOException e
-   */
-  public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
-      InetSocketAddress addr, Configuration conf, int rpcTimeout)
-  throws IOException {
-    return getProxy(protocol, addr, conf,
-      NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
-  }
-
-  /**
-   * Stop this proxy and release its invoker's resource
-   *
-   * @param proxy the proxy to be stopped
-   */
-  public static void stopProxy(IpcProtocol proxy) {
-    if (proxy != null) {
-      getProxyEngine(proxy).stopProxy(proxy);
-    }
-  }
-
   public static void setRpcTimeout(int t) {
     rpcTimeout.set(t);
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java Fri Jan 18 23:32:39 2013
@@ -45,27 +45,24 @@ public class ProtobufRpcClientEngine imp
   private static final Log LOG =
       LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine");
 
-  ProtobufRpcClientEngine() {
-    super();
+  protected HBaseClient client;
+
+  public ProtobufRpcClientEngine(Configuration conf) {
+    this.client = new HBaseClient(conf);
   }
 
-  protected final static ClientCache CLIENTS = new ClientCache();
   @Override
-  public IpcProtocol getProxy(
-      Class<? extends IpcProtocol> protocol,
-      InetSocketAddress addr, User ticket, Configuration conf,
-      SocketFactory factory, int rpcTimeout) throws IOException {
-    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
-        rpcTimeout);
-    return (IpcProtocol) Proxy.newProxyInstance(
+  public <T extends IpcProtocol> T getProxy(
+      Class<T> protocol, InetSocketAddress addr,
+      Configuration conf, int rpcTimeout) throws IOException {
+    final Invoker invoker = new Invoker(protocol, addr, User.getCurrent(), rpcTimeout, client);
+    return (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[]{protocol}, invoker);
   }
 
   @Override
-  public void stopProxy(IpcProtocol proxy) {
-    if (proxy!=null) {
-      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
-    }
+  public void close() {
+    this.client.stop();
   }
 
   static class Invoker implements InvocationHandler {
@@ -75,16 +72,14 @@ public class ProtobufRpcClientEngine imp
     private InetSocketAddress address;
     private User ticket;
     private HBaseClient client;
-    private boolean isClosed = false;
     final private int rpcTimeout;
 
-    public Invoker(Class<? extends IpcProtocol> protocol,
-                   InetSocketAddress addr, User ticket, Configuration conf,
-                   SocketFactory factory, int rpcTimeout) throws IOException {
+    public Invoker(Class<? extends IpcProtocol> protocol, InetSocketAddress addr, User ticket,
+        int rpcTimeout, HBaseClient client) throws IOException {
       this.protocol = protocol;
       this.address = addr;
       this.ticket = ticket;
-      this.client = CLIENTS.getClient(conf, factory);
+      this.client = client;
       this.rpcTimeout = rpcTimeout;
     }
 
@@ -157,13 +152,6 @@ public class ProtobufRpcClientEngine imp
       }
     }
 
-    synchronized protected void close() {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
-
     static Message getReturnProtoType(Method method) throws Exception {
       if (returnTypes.containsKey(method.getName())) {
         return returnTypes.get(method.getName());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java Fri Jan 18 23:32:39 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.security.User;
 
-import javax.net.SocketFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -32,10 +31,9 @@ import java.net.InetSocketAddress;
 @InterfaceAudience.Private
 public interface RpcClientEngine {
   /** Construct a client-side proxy object. */
-  IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
-    InetSocketAddress addr, User ticket, Configuration conf,
-    SocketFactory factory, int rpcTimeout) throws IOException;
+  <T extends IpcProtocol> T getProxy(Class<T> protocol, InetSocketAddress addr,
+      Configuration conf, int rpcTimeout) throws IOException;
 
-  /** Stop this proxy. */
-  void stopProxy(IpcProtocol proxy);
+  /** Shutdown this instance */
+  void close();
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Jan 18 23:32:39 2013
@@ -107,6 +107,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClientEngine;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -328,6 +330,9 @@ public class  HRegionServer implements C
   // unit tests.
   RpcServer rpcServer;
 
+  // RPC client for communicating with master
+  RpcClientEngine rpcClientEngine;
+
   private final InetSocketAddress isa;
   private UncaughtExceptionHandler uncaughtExceptionHandler;
 
@@ -841,6 +846,9 @@ public class  HRegionServer implements C
 
     // Create the thread to clean the moved regions list
     movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
+
+    // Setup RPC client for master communication
+    rpcClientEngine = new ProtobufRpcClientEngine(conf);
   }
 
   /**
@@ -989,9 +997,9 @@ public class  HRegionServer implements C
 
     // Make sure the proxy is down.
     if (this.hbaseMaster != null) {
-      HBaseClientRPC.stopProxy(this.hbaseMaster);
       this.hbaseMaster = null;
     }
+    this.rpcClientEngine.close();
     this.leases.close();
 
     if (!killed) {
@@ -1860,10 +1868,8 @@ public class  HRegionServer implements C
       try {
         // Do initial RPC setup. The final argument indicates that the RPC
         // should retry indefinitely.
-        master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy(
-            RegionServerStatusProtocol.class,
-            isa, this.conf, -1,
-            this.rpcTimeout, this.rpcTimeout);
+        master = HBaseClientRPC.waitForProxy(rpcClientEngine, RegionServerStatusProtocol.class,
+            isa, this.conf, -1, this.rpcTimeout, this.rpcTimeout);
         LOG.info("Connected to master at " + isa);
       } catch (IOException e) {
         e = e instanceof RemoteException ?

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Fri Jan 18 23:32:39 2013
@@ -148,7 +148,7 @@ public class ReplicationLogCleaner exten
       this.zkHelper.getZookeeperWatcher().close();
     }
     // Not sure why we're deleting a connection that we never acquired or used
-    HConnectionManager.deleteConnection(this.getConf(), true);
+    HConnectionManager.deleteConnection(this.getConf());
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/hbase-default.xml?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/hbase-server/src/main/resources/hbase-default.xml Fri Jan 18 23:32:39 2013
@@ -530,13 +530,6 @@
   </property>
 
   <property>
-    <name>hbase.rpc.client.engine</name>
-    <value>org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine</value>
-    <description>Implementation of org.apache.hadoop.hbase.ipc.RpcClientEngine to be
-    used for client RPC call marshalling.
-    </description>
-  </property>
-  <property>
     <name>hbase.rpc.server.engine</name>
     <value>org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine</value>
     <description>Implementation of org.apache.hadoop.hbase.ipc.RpcServerEngine to be

Modified: hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp (original)
+++ hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp Fri Jan 18 23:32:39 2013
@@ -318,7 +318,7 @@
 }
 } // end else
 
-HConnectionManager.deleteConnection(hbadmin.getConfiguration(), false);
+HConnectionManager.deleteConnection(hbadmin.getConfiguration());
 %>
 
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Jan 18 23:32:39 2013
@@ -500,7 +500,7 @@ public class MiniHBaseCluster extends HB
     if (this.hbaseCluster != null) {
       this.hbaseCluster.shutdown();
     }
-    HConnectionManager.deleteAllConnections(false);
+    HConnectionManager.deleteAllConnections();
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Fri Jan 18 23:32:39 2013
@@ -177,7 +177,7 @@ public class TestCatalogTracker {
       // Join the thread... should exit shortly.
       t.join();
     } finally {
-      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      HConnectionManager.deleteConnection(UTIL.getConfiguration());
     }
   }
 
@@ -255,7 +255,7 @@ public class TestCatalogTracker {
       }
     } finally {
       // Clear out our doctored connection or could mess up subsequent tests.
-      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      HConnectionManager.deleteConnection(UTIL.getConfiguration());
     }
   }
 
@@ -282,7 +282,7 @@ public class TestCatalogTracker {
       }
     } finally {
       // Clear out our doctored connection or could mess up subsequent tests.
-      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      HConnectionManager.deleteConnection(UTIL.getConfiguration());
     }
   }
 
@@ -368,7 +368,7 @@ public class TestCatalogTracker {
       final CatalogTracker ct = constructAndStartCatalogTracker(connection);
       ct.waitForMeta(100);
     } finally {
-      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      HConnectionManager.deleteConnection(UTIL.getConfiguration());
     }
   }
 
@@ -460,7 +460,7 @@ public class TestCatalogTracker {
       // Now meta is available.
       Assert.assertTrue(ct.waitForMeta(10000).equals(SN));
     } finally {
-      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      HConnectionManager.deleteConnection(UTIL.getConfiguration());
     }
   }
 
@@ -475,7 +475,7 @@ public class TestCatalogTracker {
    * {@link HConnection#getAdmin(String, int)} is called, returns the passed
    * {@link ClientProtocol} instance when {@link HConnection#getClient(String, int)}
    * is called (Be sure call
-   * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)}
+   * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration)}
    * when done with this mocked Connection.
    * @throws IOException
    */

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Fri Jan 18 23:32:39 2013
@@ -208,7 +208,7 @@ public class TestMetaReaderEditorNoClust
         scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
     } finally {
       if (ct != null) ct.stop();
-      HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+      HConnectionManager.deleteConnection(UTIL.getConfiguration());
       zkw.close();
     }
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java Fri Jan 18 23:32:39 2013
@@ -49,8 +49,6 @@ public class TestClientTimeouts {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
-    RandomTimeoutRpcEngine.setProtocolEngine(conf, MasterAdminProtocol.class);
-    RandomTimeoutRpcEngine.setProtocolEngine(conf, MasterMonitorProtocol.class);
     TEST_UTIL.startMiniCluster(SLAVES);
   }
 
@@ -73,26 +71,35 @@ public class TestClientTimeouts {
     HConnection lastConnection = null;
     boolean lastFailed = false;
     int initialInvocations = RandomTimeoutRpcEngine.getNumberOfInvocations();
-    for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
-      lastFailed = false;
-      // Ensure the HBaseAdmin uses a new connection by changing Configuration.
-      Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
-      conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++lastLimit);
-      try {
-        HBaseAdmin admin = new HBaseAdmin(conf);
-        HConnection connection = admin.getConnection();
-        assertFalse(connection == lastConnection);
-        // run some admin commands
-        HBaseAdmin.checkHBaseAvailable(conf);
-        admin.setBalancerRunning(false, false);
-      } catch (MasterNotRunningException ex) {
-        // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
-        // a MasterNotRunningException.  It's a bug if we get other exceptions.
-        lastFailed = true;
+
+    RandomTimeoutRpcEngine engine = new RandomTimeoutRpcEngine(TEST_UTIL.getConfiguration());
+    try {
+      for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
+        lastFailed = false;
+        // Ensure the HBaseAdmin uses a new connection by changing Configuration.
+        Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+        conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++lastLimit);
+        try {
+          HBaseAdmin admin = new HBaseAdmin(conf);
+          HConnection connection = admin.getConnection();
+          assertFalse(connection == lastConnection);
+          lastConnection = connection;
+          // override the connection's rpc engine for timeout testing
+          ((HConnectionManager.HConnectionImplementation)connection).setRpcEngine(engine);
+          // run some admin commands
+          HBaseAdmin.checkHBaseAvailable(conf);
+          admin.setBalancerRunning(false, false);
+        } catch (MasterNotRunningException ex) {
+          // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
+          // a MasterNotRunningException.  It's a bug if we get other exceptions.
+          lastFailed = true;
+        }
       }
+      // Ensure the RandomTimeoutRpcEngine is actually being used.
+      assertFalse(lastFailed);
+      assertTrue(RandomTimeoutRpcEngine.getNumberOfInvocations() > initialInvocations);
+    } finally {
+      engine.close();
     }
-    // Ensure the RandomTimeoutRpcEngine is actually being used.
-    assertFalse(lastFailed);
-    assertTrue(RandomTimeoutRpcEngine.getNumberOfInvocations() > initialInvocations);
   }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Fri Jan 18 23:32:39 2013
@@ -285,7 +285,7 @@ public class TestFromClientSide {
      z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
 
 
-     HConnectionManager.deleteConnection(newConfig, true);
+     HConnectionManager.deleteConnection(newConfig);
      try {
        z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
        assertTrue("We should not have a valid connection for z2", false);
@@ -296,7 +296,7 @@ public class TestFromClientSide {
      // We expect success here.
 
 
-     HConnectionManager.deleteConnection(newConfig2, true);
+     HConnectionManager.deleteConnection(newConfig2);
      try {
        z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
        assertTrue("We should not have a valid connection for z4", false);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri Jan 18 23:32:39 2013
@@ -405,7 +405,7 @@ public class TestHCM {
     } finally {
       for (HConnection c: connections) {
         // Clean up connections made so we don't interfere w/ subsequent tests.
-        HConnectionManager.deleteConnection(c.getConfiguration(), true);
+        HConnectionManager.deleteConnection(c.getConfiguration());
       }
     }
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java Fri Jan 18 23:32:39 2013
@@ -46,29 +46,24 @@ public class RandomTimeoutRpcEngine exte
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   public static double chanceOfTimeout = 0.3;
   private static AtomicInteger invokations = new AtomicInteger();
-  
-  public IpcProtocol getProxy(
-      Class<? extends IpcProtocol> protocol,
-      InetSocketAddress addr, User ticket,
-      Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
+
+  public RandomTimeoutRpcEngine(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public <T extends IpcProtocol> T getProxy(
+      Class<T> protocol, InetSocketAddress addr, Configuration conf, int rpcTimeout)
+  throws IOException {
     // Start up the requested-for proxy so we can pass-through calls to the underlying
     // RpcEngine.  Also instantiate and return our own proxy (RandomTimeoutInvocationHandler)
     // that will either throw exceptions or pass through to the underlying proxy.
-    IpcProtocol actualProxy = super.getProxy(protocol, addr,
-      ticket, conf, factory, rpcTimeout);
+    T actualProxy = super.getProxy(protocol, addr, conf, rpcTimeout);
     RandomTimeoutInvocationHandler invoker =
       new RandomTimeoutInvocationHandler(actualProxy);
-    IpcProtocol object = (IpcProtocol)Proxy.newProxyInstance(
+    T wrapperProxy = (T)Proxy.newProxyInstance(
       protocol.getClassLoader(), new Class[]{protocol}, invoker);
-    return object;
-  }
-
-  /**
-   * Call this in order to set this class to run as the RpcEngine for the given protocol
-   */
-  public static void setProtocolEngine(Configuration conf,
-      Class<? extends IpcProtocol> protocol) {
-    HBaseClientRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class);
+    return wrapperProxy;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Fri Jan 18 23:32:39 2013
@@ -73,28 +73,33 @@ public class TestDelayedRpc {
         isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
     rpcServer.start();
 
-    TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
-        rpcServer.getListenerAddress(), conf, 1000);
+    ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
+    try {
+      TestRpc client = clientEngine.getProxy(TestRpc.class,
+          rpcServer.getListenerAddress(), conf, 1000);
 
-    List<Integer> results = new ArrayList<Integer>();
+      List<Integer> results = new ArrayList<Integer>();
 
-    TestThread th1 = new TestThread(client, true, results);
-    TestThread th2 = new TestThread(client, false, results);
-    TestThread th3 = new TestThread(client, false, results);
-    th1.start();
-    Thread.sleep(100);
-    th2.start();
-    Thread.sleep(200);
-    th3.start();
-
-    th1.join();
-    th2.join();
-    th3.join();
-
-    assertEquals(UNDELAYED, results.get(0).intValue());
-    assertEquals(UNDELAYED, results.get(1).intValue());
-    assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
-        0xDEADBEEF);
+      TestThread th1 = new TestThread(client, true, results);
+      TestThread th2 = new TestThread(client, false, results);
+      TestThread th3 = new TestThread(client, false, results);
+      th1.start();
+      Thread.sleep(100);
+      th2.start();
+      Thread.sleep(200);
+      th3.start();
+
+      th1.join();
+      th2.join();
+      th3.join();
+
+      assertEquals(UNDELAYED, results.get(0).intValue());
+      assertEquals(UNDELAYED, results.get(1).intValue());
+      assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
+          0xDEADBEEF);
+    } finally {
+      clientEngine.close();
+    }
   }
 
   private static class ListAppender extends AppenderSkeleton {
@@ -136,32 +141,38 @@ public class TestDelayedRpc {
         new Class<?>[]{ TestRpcImpl.class },
         isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
     rpcServer.start();
-    TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
-        rpcServer.getListenerAddress(), conf, 1000);
 
-    Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
+    ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
+    try {
+      TestRpc client = clientEngine.getProxy(TestRpc.class,
+          rpcServer.getListenerAddress(), conf, 1000);
 
-    for (int i = 0; i < MAX_DELAYED_RPC; i++) {
-      threads[i] = new TestThread(client, true, null);
-      threads[i].start();
-    }
+      Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
 
-    /* No warnings till here. */
-    assertTrue(listAppender.getMessages().isEmpty());
+      for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+        threads[i] = new TestThread(client, true, null);
+        threads[i].start();
+      }
 
-    /* This should give a warning. */
-    threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
-    threads[MAX_DELAYED_RPC].start();
+      /* No warnings till here. */
+      assertTrue(listAppender.getMessages().isEmpty());
 
-    for (int i = 0; i < MAX_DELAYED_RPC; i++) {
-      threads[i].join();
-    }
+      /* This should give a warning. */
+      threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
+      threads[MAX_DELAYED_RPC].start();
 
-    assertFalse(listAppender.getMessages().isEmpty());
-    assertTrue(listAppender.getMessages().get(0).startsWith(
-        "Too many delayed calls"));
+      for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+        threads[i].join();
+      }
 
-    log.removeAppender(listAppender);
+      assertFalse(listAppender.getMessages().isEmpty());
+      assertTrue(listAppender.getMessages().get(0).startsWith(
+          "Too many delayed calls"));
+
+      log.removeAppender(listAppender);
+    } finally {
+      clientEngine.close();
+    }
   }
 
   public interface TestRpc extends IpcProtocol {
@@ -178,7 +189,6 @@ public class TestDelayedRpc {
     /**
      * @param delayReturnValue Should the response to the delayed call be set
      * at the start or the end of the delay.
-     * @param delay Amount of milliseconds to delay the call by
      */
     public TestRpcImpl(boolean delayReturnValue) {
       this.delayReturnValue = delayReturnValue;
@@ -251,29 +261,34 @@ public class TestDelayedRpc {
         isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
     rpcServer.start();
 
-    TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
-        rpcServer.getListenerAddress(), conf, 1000);
+    ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
+    try {
+      TestRpc client = clientEngine.getProxy(TestRpc.class,
+          rpcServer.getListenerAddress(), conf, 1000);
 
-    int result = 0xDEADBEEF;
+      int result = 0xDEADBEEF;
 
-    try {
-      result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
-    } catch (Exception e) {
-      fail("No exception should have been thrown.");
-    }
-    assertEquals(result, UNDELAYED);
+      try {
+        result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
+      } catch (Exception e) {
+        fail("No exception should have been thrown.");
+      }
+      assertEquals(result, UNDELAYED);
 
-    boolean caughtException = false;
-    try {
-      result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
-    } catch(Exception e) {
-      // Exception thrown by server is enclosed in a RemoteException.
-      if (e.getCause().getMessage().contains(
-          "java.lang.Exception: Something went wrong"))
-        caughtException = true;
-      Log.warn(e);
+      boolean caughtException = false;
+      try {
+        result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
+      } catch(Exception e) {
+        // Exception thrown by server is enclosed in a RemoteException.
+        if (e.getCause().getMessage().contains(
+            "java.lang.Exception: Something went wrong"))
+          caughtException = true;
+        Log.warn(e);
+      }
+      assertTrue(caughtException);
+    } finally {
+      clientEngine.close();
     }
-    assertTrue(caughtException);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java Fri Jan 18 23:32:39 2013
@@ -82,9 +82,6 @@ public class TestProtoBufRpc {
   @Before
   public  void setUp() throws IOException { // Setup server for both protocols
     conf = new Configuration();
-    // Set RPC engine to protobuf RPC engine
-    HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
-    HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
 
     // Create server side implementation
     PBServerImpl serverImpl = new PBServerImpl();
@@ -102,38 +99,29 @@ public class TestProtoBufRpc {
     server.stop();
   }
 
-  private static TestRpcService getClient() throws IOException {
-    // Set RPC engine to protobuf RPC engine
-    HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
-    HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
-
-    return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class,
-        addr, conf, 10000);
-  }
-
   @Test
   public void testProtoBufRpc() throws Exception {
-    TestRpcService client = getClient();
-    testProtoBufRpc(client);
-  }
-  
-  // separated test out so that other tests can call it.
-  public static void testProtoBufRpc(TestRpcService client) throws Exception {  
-    // Test ping method
-    EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
-    client.ping(null, emptyRequest);
-    
-    // Test echo method
-    EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
-        .setMessage("hello").build();
-    EchoResponseProto echoResponse = client.echo(null, echoRequest);
-    Assert.assertEquals(echoResponse.getMessage(), "hello");
-    
-    // Test error method - error should be thrown as RemoteException
+    ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
     try {
-      client.error(null, emptyRequest);
-      Assert.fail("Expected exception is not thrown");
-    } catch (ServiceException e) {
+      TestRpcService client = clientEngine.getProxy(TestRpcService.class, addr, conf, 10000);
+      // Test ping method
+      EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+      client.ping(null, emptyRequest);
+
+      // Test echo method
+      EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
+          .setMessage("hello").build();
+      EchoResponseProto echoResponse = client.echo(null, echoRequest);
+      Assert.assertEquals(echoResponse.getMessage(), "hello");
+
+      // Test error method - error should be thrown as RemoteException
+      try {
+        client.error(null, emptyRequest);
+        Assert.fail("Expected exception is not thrown");
+      } catch (ServiceException e) {
+      }
+    } finally {
+      clientEngine.close();
     }
   }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Fri Jan 18 23:32:39 2013
@@ -160,7 +160,7 @@ public class TestCatalogJanitor {
         this.ct.stop();
       }
       if (this.connection != null) {
-        HConnectionManager.deleteConnection(this.connection.getConfiguration(), true);
+        HConnectionManager.deleteConnection(this.connection.getConfiguration());
       }
     }
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java Fri Jan 18 23:32:39 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
 import org.apache.hadoop.hbase.MasterMonitorProtocol;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
 import org.junit.Test;
@@ -50,31 +51,36 @@ public class TestHMasterRPCException {
 
     ServerName sm = hm.getServerName();
     InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort());
-    int i = 0;
-    //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
-    //try to connect too soon. Retry on SocketTimeoutException.
-    while (i < 20) { 
-      try {
-        MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy(
-            MasterMonitorProtocol.class, isa, conf, 100 * 10);
-        inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
-        fail();
-      } catch (ServiceException ex) {
-        IOException ie = ProtobufUtil.getRemoteException(ex);
-        if (!(ie instanceof SocketTimeoutException)) {
-          if(ie.getMessage().startsWith(
-              "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) {
-            return;
+    ProtobufRpcClientEngine engine = new ProtobufRpcClientEngine(conf);
+    try {
+      int i = 0;
+      //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
+      //try to connect too soon. Retry on SocketTimeoutException.
+      while (i < 20) {
+        try {
+          MasterMonitorProtocol inf = engine.getProxy(
+              MasterMonitorProtocol.class, isa, conf, 100 * 10);
+          inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
+          fail();
+        } catch (ServiceException ex) {
+          IOException ie = ProtobufUtil.getRemoteException(ex);
+          if (!(ie instanceof SocketTimeoutException)) {
+            if(ie.getMessage().startsWith(
+                "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) {
+              return;
+            }
+          } else {
+            System.err.println("Got SocketTimeoutException. Will retry. ");
           }
-        } else {
-          System.err.println("Got SocketTimeoutException. Will retry. ");
+        } catch (Throwable t) {
+          fail("Unexpected throwable: " + t);
         }
-      } catch (Throwable t) {
-        fail("Unexpected throwable: " + t);
+        Thread.sleep(100);
+        i++;
       }
-      Thread.sleep(100);
-      i++;
+      fail();
+    } finally {
+      engine.close();
     }
-    fail();
   }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java Fri Jan 18 23:32:39 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.ipc.Block
 import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
 import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -363,20 +364,25 @@ public class TestTokenAuthentication {
       public Object run() throws Exception {
         Configuration c = server.getConfiguration();
         c.set(HConstants.CLUSTER_ID, clusterId.toString());
-        AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
-            (AuthenticationProtos.AuthenticationService.BlockingInterface)
-            HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class,
-                server.getAddress(), c,
-                HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
-                HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
-                HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
-        AuthenticationProtos.WhoAmIResponse response =
-            proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
-        String myname = response.getUsername();
-        assertEquals("testuser", myname);
-        String authMethod = response.getAuthMethod();
-        assertEquals("TOKEN", authMethod);
+        ProtobufRpcClientEngine rpcClient =
+            new ProtobufRpcClientEngine(c);
+        try {
+          AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
+              HBaseClientRPC.waitForProxy(rpcClient, BlockingAuthenticationService.class,
+                  server.getAddress(), c,
+                  HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
+                  HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
+                  HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
+          AuthenticationProtos.WhoAmIResponse response =
+              proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
+          String myname = response.getUsername();
+          assertEquals("testuser", myname);
+          String authMethod = response.getAuthMethod();
+          assertEquals("TOKEN", authMethod);
+        } finally {
+          rpcClient.close();
+        }
         return null;
       }
     });

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java Fri Jan 18 23:32:39 2013
@@ -108,7 +108,7 @@ public class OfflineMetaRebuildTestCore 
   @After
   public void tearDownAfter() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
-    HConnectionManager.deleteConnection(conf, true);
+    HConnectionManager.deleteConnection(conf);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java Fri Jan 18 23:32:39 2013
@@ -57,7 +57,7 @@ public class TestOfflineMetaRebuildBase 
     // shutdown the minicluster
     TEST_UTIL.shutdownMiniHBaseCluster();
     TEST_UTIL.shutdownMiniZKCluster();
-    HConnectionManager.deleteConnection(conf, false);
+    HConnectionManager.deleteConnection(conf);
 
     // rebuild meta table from scratch
     HBaseFsck fsck = new HBaseFsck(conf);