You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2013/01/19 00:32:40 UTC
svn commit: r1435414 - in /hbase/trunk/hbase-server/src:
main/jamon/org/apache/hadoop/hbase/tmpl/master/
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apac...
Author: garyh
Date: Fri Jan 18 23:32:39 2013
New Revision: 1435414
URL: http://svn.apache.org/viewvc?rev=1435414&view=rev
Log:
HBASE-7460 Cleanup RPC client connection layers
Removed:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
Modified:
hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/trunk/hbase-server/src/main/resources/hbase-default.xml
hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java
Modified: hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon (original)
+++ hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon Fri Jan 18 23:32:39 2013
@@ -283,7 +283,7 @@ org.apache.hadoop.hbase.HBaseConfigurati
<%def userTables>
<%java>
HTableDescriptor[] tables = admin.listTables();
- HConnectionManager.deleteConnection(admin.getConfiguration(), false);
+ HConnectionManager.deleteConnection(admin.getConfiguration());
</%java>
<%if (tables != null && tables.length > 0)%>
<table class="table table-striped">
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Jan 18 23:32:39 2013
@@ -75,6 +75,8 @@ import org.apache.hadoop.hbase.client.Me
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
@@ -117,7 +119,7 @@ import com.google.protobuf.ServiceExcept
* <p>But sharing connections
* makes clean up of {@link HConnection} instances a little awkward. Currently,
* clients cleanup by calling
- * {@link #deleteConnection(Configuration, boolean)}. This will shutdown the
+ * {@link #deleteConnection(Configuration)}. This will shutdown the
* zookeeper connection the HConnection was using and clean up all
* HConnection resources as well as stopping proxies to servers out on the
* cluster. Not running the cleanup will not end the world; it'll
@@ -138,7 +140,7 @@ import com.google.protobuf.ServiceExcept
* }
* </pre>
* <p>Cleanup used to be done inside in a shutdown hook. On startup we'd
- * register a shutdown hook that called {@link #deleteAllConnections(boolean)}
+ * register a shutdown hook that called {@link #deleteAllConnections()}
* on its way out but the order in which shutdown hooks run is not defined so
* were problematic for clients of HConnection that wanted to register their
* own shutdown hooks so we removed ours though this shifts the onus for
@@ -212,7 +214,7 @@ public class HConnectionManager {
connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) {
- HConnectionManager.deleteConnection(connectionKey, true, true);
+ HConnectionManager.deleteConnection(connectionKey, true);
connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection);
}
@@ -244,14 +246,9 @@ public class HConnectionManager {
* @param conf
* configuration whose identity is used to find {@link HConnection}
* instance.
- * @param stopProxy
- * Shuts down all the proxy's put up to cluster members including to
- * cluster HMaster. Calls
- * {@link HBaseClientRPC#stopProxy(IpcProtocol)}
- * .
*/
- public static void deleteConnection(Configuration conf, boolean stopProxy) {
- deleteConnection(new HConnectionKey(conf), stopProxy, false);
+ public static void deleteConnection(Configuration conf) {
+ deleteConnection(new HConnectionKey(conf), false);
}
/**
@@ -262,40 +259,37 @@ public class HConnectionManager {
* @param connection
*/
public static void deleteStaleConnection(HConnection connection) {
- deleteConnection(connection, true, true);
+ deleteConnection(connection, true);
}
/**
* Delete information for all connections.
- * @param stopProxy stop the proxy as well
* @throws IOException
*/
- public static void deleteAllConnections(boolean stopProxy) {
+ public static void deleteAllConnections() {
synchronized (HBASE_INSTANCES) {
Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
connectionKeys.addAll(HBASE_INSTANCES.keySet());
for (HConnectionKey connectionKey : connectionKeys) {
- deleteConnection(connectionKey, stopProxy, false);
+ deleteConnection(connectionKey, false);
}
HBASE_INSTANCES.clear();
}
}
- private static void deleteConnection(HConnection connection, boolean stopProxy,
- boolean staleConnection) {
+ private static void deleteConnection(HConnection connection, boolean staleConnection) {
synchronized (HBASE_INSTANCES) {
for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
.entrySet()) {
if (connectionEntry.getValue() == connection) {
- deleteConnection(connectionEntry.getKey(), stopProxy, staleConnection);
+ deleteConnection(connectionEntry.getKey(), staleConnection);
break;
}
}
}
}
- private static void deleteConnection(HConnectionKey connectionKey,
- boolean stopProxy, boolean staleConnection) {
+ private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
synchronized (HBASE_INSTANCES) {
HConnectionImplementation connection = HBASE_INSTANCES
.get(connectionKey);
@@ -303,11 +297,9 @@ public class HConnectionManager {
connection.decCount();
if (connection.isZeroReference() || staleConnection) {
HBASE_INSTANCES.remove(connectionKey);
- connection.close(stopProxy);
- } else if (stopProxy) {
- connection.stopProxyOnClose(stopProxy);
+ connection.internalClose();
}
- }else {
+ } else {
LOG.error("Connection not found in the list, can't delete it "+
"(connection key="+connectionKey+"). May be the key was modified?");
}
@@ -549,6 +541,9 @@ public class HConnectionManager {
private final Configuration conf;
+ // client RPC
+ private RpcClientEngine rpcEngine;
+
// Known region ServerName.toString() -> RegionClient/Admin
private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
@@ -575,7 +570,6 @@ public class HConnectionManager {
private final Set<Integer> regionCachePrefetchDisabledTables =
new CopyOnWriteArraySet<Integer>();
- private boolean stopProxy;
private int refCount;
// indicates whether this connection's life cycle is managed (by us)
@@ -589,6 +583,9 @@ public class HConnectionManager {
throws ZooKeeperConnectionException {
this.conf = conf;
this.managed = managed;
+ // ProtobufRpcClientEngine is the main RpcClientEngine implementation,
+ // but we maintain access through an interface to allow overriding for tests
+ this.rpcEngine = new ProtobufRpcClientEngine(conf);
String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
DEFAULT_ADMIN_PROTOCOL_CLASS);
this.closed = false;
@@ -716,7 +713,7 @@ public class HConnectionManager {
InetSocketAddress isa =
new InetSocketAddress(sn.getHostname(), sn.getPort());
- MasterProtocol tryMaster = (MasterProtocol)HBaseClientRPC.getProxy(
+ MasterProtocol tryMaster = rpcEngine.getProxy(
masterProtocolState.protocolClass,
isa, this.conf, this.rpcTimeout);
@@ -724,7 +721,6 @@ public class HConnectionManager {
null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
return tryMaster;
} else {
- HBaseClientRPC.stopProxy(tryMaster);
String msg = "Can create a proxy to master, but it is not running";
LOG.info(msg);
throw new MasterNotRunningException(msg);
@@ -897,7 +893,7 @@ public class HConnectionManager {
@Override
public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
return locateRegion(HRegionInfo.getTableName(regionName),
- HRegionInfo.getStartKey(regionName), false, true);
+ HRegionInfo.getStartKey(regionName), false, true);
}
@Override
@@ -1364,7 +1360,6 @@ public class HConnectionManager {
* @param hostname
* @param port
* @param protocolClass
- * @param version
* @return Proxy.
* @throws IOException
*/
@@ -1397,8 +1392,7 @@ public class HConnectionManager {
// Only create isa when we need to.
InetSocketAddress address = new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS
- server = HBaseClientRPC.waitForProxy(
- protocolClass, address, this.conf,
+ server = HBaseClientRPC.waitForProxy(rpcEngine, protocolClass, address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
protocols.put(protocol, server);
} catch (RemoteException e) {
@@ -1611,9 +1605,6 @@ public class HConnectionManager {
throws MasterNotRunningException {
synchronized (masterAndZKLock) {
if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
- if (protocolState.protocol != null) {
- HBaseClientRPC.stopProxy(protocolState.protocol);
- }
protocolState.protocol = null;
protocolState.protocol = createMasterWithRetries(protocolState);
}
@@ -1688,7 +1679,6 @@ public class HConnectionManager {
private void closeMasterProtocol(MasterProtocolState protocolState) {
if (protocolState.protocol != null){
LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
- HBaseClientRPC.stopProxy(protocolState.protocol);
protocolState.protocol = null;
}
protocolState.userCount = 0;
@@ -2276,10 +2266,6 @@ public class HConnectionManager {
}
}
- public void stopProxyOnClose(boolean stopProxy) {
- this.stopProxy = stopProxy;
- }
-
/**
* Increment this client's reference count.
*/
@@ -2305,21 +2291,15 @@ public class HConnectionManager {
return refCount == 0;
}
- void close(boolean stopProxy) {
+ void internalClose() {
if (this.closed) {
return;
}
delayedClosing.stop("Closing connection");
- if (stopProxy) {
- closeMaster();
- for (Map<String, IpcProtocol> i : servers.values()) {
- for (IpcProtocol server: i.values()) {
- HBaseClientRPC.stopProxy(server);
- }
- }
- }
+ closeMaster();
closeZooKeeperWatcher();
this.servers.clear();
+ this.rpcEngine.close();
this.closed = true;
}
@@ -2329,10 +2309,10 @@ public class HConnectionManager {
if (aborted) {
HConnectionManager.deleteStaleConnection(this);
} else {
- HConnectionManager.deleteConnection(this, stopProxy, false);
+ HConnectionManager.deleteConnection(this, false);
}
} else {
- close(true);
+ internalClose();
}
}
@@ -2419,6 +2399,14 @@ public class HConnectionManager {
}
throw new TableNotFoundException(Bytes.toString(tableName));
}
+
+ /**
+ * Override the RpcClientEngine implementation used by this connection.
+ * <strong>FOR TESTING PURPOSES ONLY!</strong>
+ */
+ void setRpcEngine(RpcClientEngine engine) {
+ this.rpcEngine = engine;
+ }
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri Jan 18 23:32:39 2013
@@ -126,7 +126,6 @@ public class HBaseClient {
protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets
- private int refCount = 1;
protected String clusterId;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -235,31 +234,6 @@ public class HBaseClient {
return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
}
- /**
- * Increment this client's reference count
- *
- */
- synchronized void incCount() {
- refCount++;
- }
-
- /**
- * Decrement this client's reference count
- *
- */
- synchronized void decCount() {
- refCount--;
- }
-
- /**
- * Return if this client has no reference
- *
- * @return true if this client has no reference; false otherwise
- */
- synchronized boolean isZeroReference() {
- return refCount==0;
- }
-
/** A call waiting for a value. */
protected class Call {
final int id; // call id
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java Fri Jan 18 23:32:39 2013
@@ -48,22 +48,6 @@ public class HBaseClientRPC {
protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC");
- /**
- * Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine}
- * implementation to load to handle connection protocols. Handlers for individual
- * protocols can be configured using {@code "hbase.rpc.client.engine." +
- * protocol.class.name}.
- */
- public static final String RPC_ENGINE_PROP = "hbase.rpc.client.engine";
-
- // cache of RpcEngines by protocol
- private static final Map<Class<? extends IpcProtocol>, RpcClientEngine> PROTOCOL_ENGINES =
- new HashMap<Class<? extends IpcProtocol>, RpcClientEngine>();
-
- // Track what RpcEngine is used by a proxy class, for stopProxy()
- private static final Map<Class<?>, RpcClientEngine> PROXY_ENGINES =
- new HashMap<Class<?>, RpcClientEngine>();
-
// thread-specific RPC timeout, which may override that of RpcEngine
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override
@@ -72,41 +56,6 @@ public class HBaseClientRPC {
}
};
- // set a protocol to use a non-default RpcEngine
- static void setProtocolEngine(Configuration conf,
- Class<? extends IpcProtocol> protocol, Class<? extends RpcClientEngine> engine) {
- conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine,
- RpcClientEngine.class);
- }
-
- // return the RpcEngine configured to handle a protocol
- static synchronized RpcClientEngine getProtocolEngine(
- Class<? extends IpcProtocol> protocol, Configuration conf) {
- RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol);
- if (engine == null) {
- // check for a configured default engine
- Class<?> defaultEngine =
- conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class);
-
- // check for a per interface override
- Class<?> impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(),
- defaultEngine);
- LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
- engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf);
- if (protocol.isInterface()) {
- PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol),
- engine);
- }
- PROTOCOL_ENGINES.put(protocol, engine);
- }
- return engine;
- }
-
- // return the RpcEngine that handles a proxy object
- private static synchronized RpcClientEngine getProxyEngine(Object proxy) {
- return PROXY_ENGINES.get(proxy.getClass());
- }
-
/**
* @param protocol protocol interface
* @param addr address of remote service
@@ -117,12 +66,13 @@ public class HBaseClientRPC {
* @return proxy
* @throws java.io.IOException e
*/
- public static IpcProtocol waitForProxy(Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr,
- Configuration conf,
- int maxAttempts,
- int rpcTimeout,
- long timeout)
+ public static <T extends IpcProtocol> T waitForProxy(RpcClientEngine engine,
+ Class<T> protocol,
+ InetSocketAddress addr,
+ Configuration conf,
+ int maxAttempts,
+ int rpcTimeout,
+ long timeout)
throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
long startTime = System.currentTimeMillis();
@@ -130,7 +80,7 @@ public class HBaseClientRPC {
int reconnectAttempts = 0;
while (true) {
try {
- return getProxy(protocol, addr, conf, rpcTimeout);
+ return engine.getProxy(protocol, addr, conf, rpcTimeout);
} catch (SocketTimeoutException te) {
LOG.info("Problem connecting to server: " + addr);
ioe = te;
@@ -194,77 +144,6 @@ public class HBaseClientRPC {
}
}
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- *
- * @param protocol interface
- * @param addr remote address
- * @param conf configuration
- * @param factory socket factory
- * @param rpcTimeout timeout for each RPC
- * @return proxy
- * @throws java.io.IOException e
- */
- public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr,
- Configuration conf,
- SocketFactory factory,
- int rpcTimeout) throws IOException {
- return getProxy(protocol, addr, User.getCurrent(), conf, factory, rpcTimeout);
- }
-
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- *
- * @param protocol interface
- * @param addr remote address
- * @param ticket ticket
- * @param conf configuration
- * @param factory socket factory
- * @param rpcTimeout timeout for each RPC
- * @return proxy
- * @throws java.io.IOException e
- */
- public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout)
- throws IOException {
- RpcClientEngine engine = getProtocolEngine(protocol, conf);
- IpcProtocol proxy = engine.getProxy(protocol, addr, ticket, conf, factory,
- Math.min(rpcTimeout, getRpcTimeout()));
- return proxy;
- }
-
- /**
- * Construct a client-side proxy object with the default SocketFactory
- *
- * @param protocol interface
- * @param addr remote address
- * @param conf configuration
- * @param rpcTimeout timeout for each RPC
- * @return a proxy instance
- * @throws java.io.IOException e
- */
- public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr, Configuration conf, int rpcTimeout)
- throws IOException {
- return getProxy(protocol, addr, conf,
- NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
- }
-
- /**
- * Stop this proxy and release its invoker's resource
- *
- * @param proxy the proxy to be stopped
- */
- public static void stopProxy(IpcProtocol proxy) {
- if (proxy != null) {
- getProxyEngine(proxy).stopProxy(proxy);
- }
- }
-
public static void setRpcTimeout(int t) {
rpcTimeout.set(t);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java Fri Jan 18 23:32:39 2013
@@ -45,27 +45,24 @@ public class ProtobufRpcClientEngine imp
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine");
- ProtobufRpcClientEngine() {
- super();
+ protected HBaseClient client;
+
+ public ProtobufRpcClientEngine(Configuration conf) {
+ this.client = new HBaseClient(conf);
}
- protected final static ClientCache CLIENTS = new ClientCache();
@Override
- public IpcProtocol getProxy(
- Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr, User ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException {
- final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
- rpcTimeout);
- return (IpcProtocol) Proxy.newProxyInstance(
+ public <T extends IpcProtocol> T getProxy(
+ Class<T> protocol, InetSocketAddress addr,
+ Configuration conf, int rpcTimeout) throws IOException {
+ final Invoker invoker = new Invoker(protocol, addr, User.getCurrent(), rpcTimeout, client);
+ return (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
}
@Override
- public void stopProxy(IpcProtocol proxy) {
- if (proxy!=null) {
- ((Invoker)Proxy.getInvocationHandler(proxy)).close();
- }
+ public void close() {
+ this.client.stop();
}
static class Invoker implements InvocationHandler {
@@ -75,16 +72,14 @@ public class ProtobufRpcClientEngine imp
private InetSocketAddress address;
private User ticket;
private HBaseClient client;
- private boolean isClosed = false;
final private int rpcTimeout;
- public Invoker(Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr, User ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException {
+ public Invoker(Class<? extends IpcProtocol> protocol, InetSocketAddress addr, User ticket,
+ int rpcTimeout, HBaseClient client) throws IOException {
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory);
+ this.client = client;
this.rpcTimeout = rpcTimeout;
}
@@ -157,13 +152,6 @@ public class ProtobufRpcClientEngine imp
}
}
- synchronized protected void close() {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
- }
-
static Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java Fri Jan 18 23:32:39 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User;
-import javax.net.SocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -32,10 +31,9 @@ import java.net.InetSocketAddress;
@InterfaceAudience.Private
public interface RpcClientEngine {
/** Construct a client-side proxy object. */
- IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr, User ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException;
+ <T extends IpcProtocol> T getProxy(Class<T> protocol, InetSocketAddress addr,
+ Configuration conf, int rpcTimeout) throws IOException;
- /** Stop this proxy. */
- void stopProxy(IpcProtocol proxy);
+ /** Shutdown this instance */
+ void close();
}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Jan 18 23:32:39 2013
@@ -107,6 +107,8 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
+import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -328,6 +330,9 @@ public class HRegionServer implements C
// unit tests.
RpcServer rpcServer;
+ // RPC client for communicating with master
+ RpcClientEngine rpcClientEngine;
+
private final InetSocketAddress isa;
private UncaughtExceptionHandler uncaughtExceptionHandler;
@@ -841,6 +846,9 @@ public class HRegionServer implements C
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
+
+ // Setup RPC client for master communication
+ rpcClientEngine = new ProtobufRpcClientEngine(conf);
}
/**
@@ -989,9 +997,9 @@ public class HRegionServer implements C
// Make sure the proxy is down.
if (this.hbaseMaster != null) {
- HBaseClientRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null;
}
+ this.rpcClientEngine.close();
this.leases.close();
if (!killed) {
@@ -1860,10 +1868,8 @@ public class HRegionServer implements C
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
- master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy(
- RegionServerStatusProtocol.class,
- isa, this.conf, -1,
- this.rpcTimeout, this.rpcTimeout);
+ master = HBaseClientRPC.waitForProxy(rpcClientEngine, RegionServerStatusProtocol.class,
+ isa, this.conf, -1, this.rpcTimeout, this.rpcTimeout);
LOG.info("Connected to master at " + isa);
} catch (IOException e) {
e = e instanceof RemoteException ?
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Fri Jan 18 23:32:39 2013
@@ -148,7 +148,7 @@ public class ReplicationLogCleaner exten
this.zkHelper.getZookeeperWatcher().close();
}
// Not sure why we're deleting a connection that we never acquired or used
- HConnectionManager.deleteConnection(this.getConf(), true);
+ HConnectionManager.deleteConnection(this.getConf());
}
@Override
Modified: hbase/trunk/hbase-server/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/hbase-default.xml?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/resources/hbase-default.xml (original)
+++ hbase/trunk/hbase-server/src/main/resources/hbase-default.xml Fri Jan 18 23:32:39 2013
@@ -530,13 +530,6 @@
</property>
<property>
- <name>hbase.rpc.client.engine</name>
- <value>org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine</value>
- <description>Implementation of org.apache.hadoop.hbase.ipc.RpcClientEngine to be
- used for client RPC call marshalling.
- </description>
- </property>
- <property>
<name>hbase.rpc.server.engine</name>
<value>org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine</value>
<description>Implementation of org.apache.hadoop.hbase.ipc.RpcServerEngine to be
Modified: hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp (original)
+++ hbase/trunk/hbase-server/src/main/resources/hbase-webapps/master/table.jsp Fri Jan 18 23:32:39 2013
@@ -318,7 +318,7 @@
}
} // end else
-HConnectionManager.deleteConnection(hbadmin.getConfiguration(), false);
+HConnectionManager.deleteConnection(hbadmin.getConfiguration());
%>
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Jan 18 23:32:39 2013
@@ -500,7 +500,7 @@ public class MiniHBaseCluster extends HB
if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown();
}
- HConnectionManager.deleteAllConnections(false);
+ HConnectionManager.deleteAllConnections();
}
@Override
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Fri Jan 18 23:32:39 2013
@@ -177,7 +177,7 @@ public class TestCatalogTracker {
// Join the thread... should exit shortly.
t.join();
} finally {
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -255,7 +255,7 @@ public class TestCatalogTracker {
}
} finally {
// Clear out our doctored connection or could mess up subsequent tests.
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -282,7 +282,7 @@ public class TestCatalogTracker {
}
} finally {
// Clear out our doctored connection or could mess up subsequent tests.
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -368,7 +368,7 @@ public class TestCatalogTracker {
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
ct.waitForMeta(100);
} finally {
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -460,7 +460,7 @@ public class TestCatalogTracker {
// Now meta is available.
Assert.assertTrue(ct.waitForMeta(10000).equals(SN));
} finally {
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -475,7 +475,7 @@ public class TestCatalogTracker {
* {@link HConnection#getAdmin(String, int)} is called, returns the passed
* {@link ClientProtocol} instance when {@link HConnection#getClient(String, int)}
* is called (Be sure call
- * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)}
+ * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration)}
* when done with this mocked Connection.
* @throws IOException
*/
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Fri Jan 18 23:32:39 2013
@@ -208,7 +208,7 @@ public class TestMetaReaderEditorNoClust
scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
} finally {
if (ct != null) ct.stop();
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
zkw.close();
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java Fri Jan 18 23:32:39 2013
@@ -49,8 +49,6 @@ public class TestClientTimeouts {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
- RandomTimeoutRpcEngine.setProtocolEngine(conf, MasterAdminProtocol.class);
- RandomTimeoutRpcEngine.setProtocolEngine(conf, MasterMonitorProtocol.class);
TEST_UTIL.startMiniCluster(SLAVES);
}
@@ -73,26 +71,35 @@ public class TestClientTimeouts {
HConnection lastConnection = null;
boolean lastFailed = false;
int initialInvocations = RandomTimeoutRpcEngine.getNumberOfInvocations();
- for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
- lastFailed = false;
- // Ensure the HBaseAdmin uses a new connection by changing Configuration.
- Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
- conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++lastLimit);
- try {
- HBaseAdmin admin = new HBaseAdmin(conf);
- HConnection connection = admin.getConnection();
- assertFalse(connection == lastConnection);
- // run some admin commands
- HBaseAdmin.checkHBaseAvailable(conf);
- admin.setBalancerRunning(false, false);
- } catch (MasterNotRunningException ex) {
- // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
- // a MasterNotRunningException. It's a bug if we get other exceptions.
- lastFailed = true;
+
+ RandomTimeoutRpcEngine engine = new RandomTimeoutRpcEngine(TEST_UTIL.getConfiguration());
+ try {
+ for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
+ lastFailed = false;
+ // Ensure the HBaseAdmin uses a new connection by changing Configuration.
+ Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ conf.setLong(HConstants.HBASE_CLIENT_PREFETCH_LIMIT, ++lastLimit);
+ try {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HConnection connection = admin.getConnection();
+ assertFalse(connection == lastConnection);
+ lastConnection = connection;
+ // override the connection's rpc engine for timeout testing
+ ((HConnectionManager.HConnectionImplementation)connection).setRpcEngine(engine);
+ // run some admin commands
+ HBaseAdmin.checkHBaseAvailable(conf);
+ admin.setBalancerRunning(false, false);
+ } catch (MasterNotRunningException ex) {
+ // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
+ // a MasterNotRunningException. It's a bug if we get other exceptions.
+ lastFailed = true;
+ }
}
+ // Ensure the RandomTimeoutRpcEngine is actually being used.
+ assertFalse(lastFailed);
+ assertTrue(RandomTimeoutRpcEngine.getNumberOfInvocations() > initialInvocations);
+ } finally {
+ engine.close();
}
- // Ensure the RandomTimeoutRpcEngine is actually being used.
- assertFalse(lastFailed);
- assertTrue(RandomTimeoutRpcEngine.getNumberOfInvocations() > initialInvocations);
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Fri Jan 18 23:32:39 2013
@@ -285,7 +285,7 @@ public class TestFromClientSide {
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
- HConnectionManager.deleteConnection(newConfig, true);
+ HConnectionManager.deleteConnection(newConfig);
try {
z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
assertTrue("We should not have a valid connection for z2", false);
@@ -296,7 +296,7 @@ public class TestFromClientSide {
// We expect success here.
- HConnectionManager.deleteConnection(newConfig2, true);
+ HConnectionManager.deleteConnection(newConfig2);
try {
z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
assertTrue("We should not have a valid connection for z4", false);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri Jan 18 23:32:39 2013
@@ -405,7 +405,7 @@ public class TestHCM {
} finally {
for (HConnection c: connections) {
// Clean up connections made so we don't interfere w/ subsequent tests.
- HConnectionManager.deleteConnection(c.getConfiguration(), true);
+ HConnectionManager.deleteConnection(c.getConfiguration());
}
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java Fri Jan 18 23:32:39 2013
@@ -46,29 +46,24 @@ public class RandomTimeoutRpcEngine exte
private static final Random RANDOM = new Random(System.currentTimeMillis());
public static double chanceOfTimeout = 0.3;
private static AtomicInteger invokations = new AtomicInteger();
-
- public IpcProtocol getProxy(
- Class<? extends IpcProtocol> protocol,
- InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
+
+ public RandomTimeoutRpcEngine(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public <T extends IpcProtocol> T getProxy(
+ Class<T> protocol, InetSocketAddress addr, Configuration conf, int rpcTimeout)
+ throws IOException {
// Start up the requested-for proxy so we can pass-through calls to the underlying
// RpcEngine. Also instantiate and return our own proxy (RandomTimeoutInvocationHandler)
// that will either throw exceptions or pass through to the underlying proxy.
- IpcProtocol actualProxy = super.getProxy(protocol, addr,
- ticket, conf, factory, rpcTimeout);
+ T actualProxy = super.getProxy(protocol, addr, conf, rpcTimeout);
RandomTimeoutInvocationHandler invoker =
new RandomTimeoutInvocationHandler(actualProxy);
- IpcProtocol object = (IpcProtocol)Proxy.newProxyInstance(
+ T wrapperProxy = (T)Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
- return object;
- }
-
- /**
- * Call this in order to set this class to run as the RpcEngine for the given protocol
- */
- public static void setProtocolEngine(Configuration conf,
- Class<? extends IpcProtocol> protocol) {
- HBaseClientRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class);
+ return wrapperProxy;
}
/**
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Fri Jan 18 23:32:39 2013
@@ -73,28 +73,33 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
- TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
- rpcServer.getListenerAddress(), conf, 1000);
+ ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
+ try {
+ TestRpc client = clientEngine.getProxy(TestRpc.class,
+ rpcServer.getListenerAddress(), conf, 1000);
- List<Integer> results = new ArrayList<Integer>();
+ List<Integer> results = new ArrayList<Integer>();
- TestThread th1 = new TestThread(client, true, results);
- TestThread th2 = new TestThread(client, false, results);
- TestThread th3 = new TestThread(client, false, results);
- th1.start();
- Thread.sleep(100);
- th2.start();
- Thread.sleep(200);
- th3.start();
-
- th1.join();
- th2.join();
- th3.join();
-
- assertEquals(UNDELAYED, results.get(0).intValue());
- assertEquals(UNDELAYED, results.get(1).intValue());
- assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
- 0xDEADBEEF);
+ TestThread th1 = new TestThread(client, true, results);
+ TestThread th2 = new TestThread(client, false, results);
+ TestThread th3 = new TestThread(client, false, results);
+ th1.start();
+ Thread.sleep(100);
+ th2.start();
+ Thread.sleep(200);
+ th3.start();
+
+ th1.join();
+ th2.join();
+ th3.join();
+
+ assertEquals(UNDELAYED, results.get(0).intValue());
+ assertEquals(UNDELAYED, results.get(1).intValue());
+ assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
+ 0xDEADBEEF);
+ } finally {
+ clientEngine.close();
+ }
}
private static class ListAppender extends AppenderSkeleton {
@@ -136,32 +141,38 @@ public class TestDelayedRpc {
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
- TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
- rpcServer.getListenerAddress(), conf, 1000);
- Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
+ ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
+ try {
+ TestRpc client = clientEngine.getProxy(TestRpc.class,
+ rpcServer.getListenerAddress(), conf, 1000);
- for (int i = 0; i < MAX_DELAYED_RPC; i++) {
- threads[i] = new TestThread(client, true, null);
- threads[i].start();
- }
+ Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
- /* No warnings till here. */
- assertTrue(listAppender.getMessages().isEmpty());
+ for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+ threads[i] = new TestThread(client, true, null);
+ threads[i].start();
+ }
- /* This should give a warning. */
- threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
- threads[MAX_DELAYED_RPC].start();
+ /* No warnings till here. */
+ assertTrue(listAppender.getMessages().isEmpty());
- for (int i = 0; i < MAX_DELAYED_RPC; i++) {
- threads[i].join();
- }
+ /* This should give a warning. */
+ threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
+ threads[MAX_DELAYED_RPC].start();
- assertFalse(listAppender.getMessages().isEmpty());
- assertTrue(listAppender.getMessages().get(0).startsWith(
- "Too many delayed calls"));
+ for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+ threads[i].join();
+ }
- log.removeAppender(listAppender);
+ assertFalse(listAppender.getMessages().isEmpty());
+ assertTrue(listAppender.getMessages().get(0).startsWith(
+ "Too many delayed calls"));
+
+ log.removeAppender(listAppender);
+ } finally {
+ clientEngine.close();
+ }
}
public interface TestRpc extends IpcProtocol {
@@ -178,7 +189,6 @@ public class TestDelayedRpc {
/**
* @param delayReturnValue Should the response to the delayed call be set
* at the start or the end of the delay.
- * @param delay Amount of milliseconds to delay the call by
*/
public TestRpcImpl(boolean delayReturnValue) {
this.delayReturnValue = delayReturnValue;
@@ -251,29 +261,34 @@ public class TestDelayedRpc {
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
- TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
- rpcServer.getListenerAddress(), conf, 1000);
+ ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
+ try {
+ TestRpc client = clientEngine.getProxy(TestRpc.class,
+ rpcServer.getListenerAddress(), conf, 1000);
- int result = 0xDEADBEEF;
+ int result = 0xDEADBEEF;
- try {
- result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
- } catch (Exception e) {
- fail("No exception should have been thrown.");
- }
- assertEquals(result, UNDELAYED);
+ try {
+ result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
+ } catch (Exception e) {
+ fail("No exception should have been thrown.");
+ }
+ assertEquals(result, UNDELAYED);
- boolean caughtException = false;
- try {
- result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
- } catch(Exception e) {
- // Exception thrown by server is enclosed in a RemoteException.
- if (e.getCause().getMessage().contains(
- "java.lang.Exception: Something went wrong"))
- caughtException = true;
- Log.warn(e);
+ boolean caughtException = false;
+ try {
+ result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
+ } catch(Exception e) {
+ // Exception thrown by server is enclosed in a RemoteException.
+ if (e.getCause().getMessage().contains(
+ "java.lang.Exception: Something went wrong"))
+ caughtException = true;
+ Log.warn(e);
+ }
+ assertTrue(caughtException);
+ } finally {
+ clientEngine.close();
}
- assertTrue(caughtException);
}
/**
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java Fri Jan 18 23:32:39 2013
@@ -82,9 +82,6 @@ public class TestProtoBufRpc {
@Before
public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration();
- // Set RPC engine to protobuf RPC engine
- HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
- HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();
@@ -102,38 +99,29 @@ public class TestProtoBufRpc {
server.stop();
}
- private static TestRpcService getClient() throws IOException {
- // Set RPC engine to protobuf RPC engine
- HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
- HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
-
- return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class,
- addr, conf, 10000);
- }
-
@Test
public void testProtoBufRpc() throws Exception {
- TestRpcService client = getClient();
- testProtoBufRpc(client);
- }
-
- // separated test out so that other tests can call it.
- public static void testProtoBufRpc(TestRpcService client) throws Exception {
- // Test ping method
- EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
- client.ping(null, emptyRequest);
-
- // Test echo method
- EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
- .setMessage("hello").build();
- EchoResponseProto echoResponse = client.echo(null, echoRequest);
- Assert.assertEquals(echoResponse.getMessage(), "hello");
-
- // Test error method - error should be thrown as RemoteException
+ ProtobufRpcClientEngine clientEngine = new ProtobufRpcClientEngine(conf);
try {
- client.error(null, emptyRequest);
- Assert.fail("Expected exception is not thrown");
- } catch (ServiceException e) {
+ TestRpcService client = clientEngine.getProxy(TestRpcService.class, addr, conf, 10000);
+ // Test ping method
+ EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+ client.ping(null, emptyRequest);
+
+ // Test echo method
+ EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
+ .setMessage("hello").build();
+ EchoResponseProto echoResponse = client.echo(null, echoRequest);
+ Assert.assertEquals(echoResponse.getMessage(), "hello");
+
+ // Test error method - error should be thrown as RemoteException
+ try {
+ client.error(null, emptyRequest);
+ Assert.fail("Expected exception is not thrown");
+ } catch (ServiceException e) {
+ }
+ } finally {
+ clientEngine.close();
}
}
}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Fri Jan 18 23:32:39 2013
@@ -160,7 +160,7 @@ public class TestCatalogJanitor {
this.ct.stop();
}
if (this.connection != null) {
- HConnectionManager.deleteConnection(this.connection.getConfiguration(), true);
+ HConnectionManager.deleteConnection(this.connection.getConfiguration());
}
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java Fri Jan 18 23:32:39 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.junit.Test;
@@ -50,31 +51,36 @@ public class TestHMasterRPCException {
ServerName sm = hm.getServerName();
InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort());
- int i = 0;
- //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
- //try to connect too soon. Retry on SocketTimeoutException.
- while (i < 20) {
- try {
- MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy(
- MasterMonitorProtocol.class, isa, conf, 100 * 10);
- inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
- fail();
- } catch (ServiceException ex) {
- IOException ie = ProtobufUtil.getRemoteException(ex);
- if (!(ie instanceof SocketTimeoutException)) {
- if(ie.getMessage().startsWith(
- "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) {
- return;
+ ProtobufRpcClientEngine engine = new ProtobufRpcClientEngine(conf);
+ try {
+ int i = 0;
+ //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
+ //try to connect too soon. Retry on SocketTimeoutException.
+ while (i < 20) {
+ try {
+ MasterMonitorProtocol inf = engine.getProxy(
+ MasterMonitorProtocol.class, isa, conf, 100 * 10);
+ inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
+ fail();
+ } catch (ServiceException ex) {
+ IOException ie = ProtobufUtil.getRemoteException(ex);
+ if (!(ie instanceof SocketTimeoutException)) {
+ if(ie.getMessage().startsWith(
+ "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) {
+ return;
+ }
+ } else {
+ System.err.println("Got SocketTimeoutException. Will retry. ");
}
- } else {
- System.err.println("Got SocketTimeoutException. Will retry. ");
+ } catch (Throwable t) {
+ fail("Unexpected throwable: " + t);
}
- } catch (Throwable t) {
- fail("Unexpected throwable: " + t);
+ Thread.sleep(100);
+ i++;
}
- Thread.sleep(100);
- i++;
+ fail();
+ } finally {
+ engine.close();
}
- fail();
}
}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java Fri Jan 18 23:32:39 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.ipc.Block
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
+import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -363,20 +364,25 @@ public class TestTokenAuthentication {
public Object run() throws Exception {
Configuration c = server.getConfiguration();
c.set(HConstants.CLUSTER_ID, clusterId.toString());
- AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
- (AuthenticationProtos.AuthenticationService.BlockingInterface)
- HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class,
- server.getAddress(), c,
- HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
- AuthenticationProtos.WhoAmIResponse response =
- proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
- String myname = response.getUsername();
- assertEquals("testuser", myname);
- String authMethod = response.getAuthMethod();
- assertEquals("TOKEN", authMethod);
+ ProtobufRpcClientEngine rpcClient =
+ new ProtobufRpcClientEngine(c);
+ try {
+ AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
+ HBaseClientRPC.waitForProxy(rpcClient, BlockingAuthenticationService.class,
+ server.getAddress(), c,
+ HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
+ AuthenticationProtos.WhoAmIResponse response =
+ proxy.whoami(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
+ String myname = response.getUsername();
+ assertEquals("testuser", myname);
+ String authMethod = response.getAuthMethod();
+ assertEquals("TOKEN", authMethod);
+ } finally {
+ rpcClient.close();
+ }
return null;
}
});
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java Fri Jan 18 23:32:39 2013
@@ -108,7 +108,7 @@ public class OfflineMetaRebuildTestCore
@After
public void tearDownAfter() throws Exception {
TEST_UTIL.shutdownMiniCluster();
- HConnectionManager.deleteConnection(conf, true);
+ HConnectionManager.deleteConnection(conf);
}
/**
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java?rev=1435414&r1=1435413&r2=1435414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java Fri Jan 18 23:32:39 2013
@@ -57,7 +57,7 @@ public class TestOfflineMetaRebuildBase
// shutdown the minicluster
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniZKCluster();
- HConnectionManager.deleteConnection(conf, false);
+ HConnectionManager.deleteConnection(conf);
// rebuild meta table from scratch
HBaseFsck fsck = new HBaseFsck(conf);