You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2013/01/29 01:50:03 UTC
svn commit: r1439723 - in /hbase/branches/0.94:
security/src/main/java/org/apache/hadoop/hbase/ipc/
src/main/jamon/org/apache/hadoop/hbase/tmpl/master/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/ipc/ src/main/ja...
Author: garyh
Date: Tue Jan 29 00:50:02 2013
New Revision: 1439723
URL: http://svn.apache.org/viewvc?rev=1439723&view=rev
Log:
HBASE-7626 Backport client connection cleanup from HBASE-7460
Modified:
hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
hbase/branches/0.94/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java
Modified: hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (original)
+++ hbase/branches/0.94/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java Tue Jan 29 00:50:02 2013
@@ -20,11 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
@@ -33,20 +29,11 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.*;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.util.HashMap;
-import java.util.Map;
/**
* A loadable RPC engine supporting SASL authentication of connections, using
@@ -64,93 +51,46 @@ import java.util.Map;
*/
public class SecureRpcEngine implements RpcEngine {
// Leave this out in the hadoop ipc package but keep class name. Do this
- // so that we dont' get the logging of this class's invocations by doing our
+ // so that we do not get the logging of this class' invocations by doing our
// blanket enabling DEBUG on the o.a.h.h. package.
protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.SecureRpcEngine");
- private SecureRpcEngine() {
- super();
- } // no public ctor
-
- /* Cache a client using the configured clusterId */
- static private class ClientCache {
- private Map<String, SecureClient> clients =
- new HashMap<String, SecureClient>();
-
- protected ClientCache() {}
-
- /**
- * Construct & cache an IPC client with the configured
- * {@link HConstants#CLUSTER_ID} if no cached client exists.
- *
- * @param conf
- * Configuration
- * @param factory
- * socket factory
- * @return an IPC client
- */
- protected synchronized SecureClient getClient(Configuration conf,
- SocketFactory factory) {
- String clusterId = conf.get(HConstants.CLUSTER_ID, "default");
- SecureClient client = clients.get(clusterId);
- if (client == null) {
- // Make an hbase client instead of hadoop Client.
- client = new SecureClient(HbaseObjectWritable.class, conf, factory);
- clients.put(clusterId, client);
- } else {
- client.incCount();
- }
- return client;
- }
+ private Configuration conf;
+ private SecureClient client;
- /**
- * Construct & cache an IPC client with the configured
- * {@link HConstants#CLUSTER_ID} if no cached client exists.
- *
- * @param conf
- * Configuration
- * @return an IPC client
- */
- protected synchronized SecureClient getClient(Configuration conf) {
- return getClient(conf, SocketFactory.getDefault());
+ @Override
+ public void setConf(Configuration config) {
+ this.conf = config;
+ if (User.isHBaseSecurityEnabled(conf)) {
+ HBaseSaslRpcServer.init(conf);
}
-
- /**
- * Stop a RPC client connection
- * A RPC client is closed only when its reference count becomes zero.
- * @param client client to stop
- */
- protected void stopClient(SecureClient client) {
- synchronized (this) {
- client.decCount();
- if (client.isZeroReference()) {
- clients.remove(client.getClusterId());
- }
- }
- if (client.isZeroReference()) {
- client.stop();
- }
+ // check for an already created client
+ if (this.client != null) {
+ this.client.stop();
}
+ this.client = new SecureClient(HbaseObjectWritable.class, conf);
}
- protected final static ClientCache CLIENTS = new ClientCache();
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
private static class Invoker implements InvocationHandler {
private Class<? extends VersionedProtocol> protocol;
private InetSocketAddress address;
private User ticket;
private SecureClient client;
- private boolean isClosed = false;
final private int rpcTimeout;
- public Invoker(Class<? extends VersionedProtocol> protocol,
- InetSocketAddress address, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout) {
+ public Invoker(SecureClient client,
+ Class<? extends VersionedProtocol> protocol,
+ InetSocketAddress address, User ticket, int rpcTimeout) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory);
+ this.client = client;
this.rpcTimeout = rpcTimeout;
}
@@ -170,14 +110,6 @@ public class SecureRpcEngine implements
}
return value.get();
}
-
- /* close the IPC client that's responsible for this invoker's RPCs */
- synchronized protected void close() {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
- }
}
/**
@@ -187,24 +119,30 @@ public class SecureRpcEngine implements
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
- * @param ticket ticket
* @param conf configuration
- * @param factory socket factory
* @return proxy
* @throws java.io.IOException e
*/
- public VersionedProtocol getProxy(
- Class<? extends VersionedProtocol> protocol, long clientVersion,
- InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout)
+ @Override
+ public <T extends VersionedProtocol> T getProxy(
+ Class<T> protocol, long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf, int rpcTimeout)
throws IOException {
- if (User.isSecurityEnabled()) {
- HBaseSaslRpcServer.init(conf);
+ if (this.client == null) {
+ throw new IOException("Client must be initialized by calling setConf(Configuration)");
}
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
+
+ T proxy =
+ (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
+ new Invoker(this.client, protocol, addr, User.getCurrent(),
+ HBaseRPC.getRpcTimeout(rpcTimeout)));
+ /*
+ * TODO: checking protocol version only needs to be done once when we setup a new
+ * SecureClient.Connection. Doing it every time we retrieve a proxy instance is resulting
+ * in unnecessary RPC traffic.
+ */
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion != clientVersion) {
@@ -214,50 +152,48 @@ public class SecureRpcEngine implements
return proxy;
}
- /**
- * Stop this proxy and release its invoker's resource
- * @param proxy the proxy to be stopped
- */
- public void stopProxy(VersionedProtocol proxy) {
- if (proxy!=null) {
- ((Invoker)Proxy.getInvocationHandler(proxy)).close();
- }
- }
-
-
/** Expert: Make multiple, parallel calls to a set of servers. */
+ @Override
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
Class<? extends VersionedProtocol> protocol,
User ticket, Configuration conf)
throws IOException, InterruptedException {
+ if (this.client == null) {
+ throw new IOException("Client must be initialized by calling setConf(Configuration)");
+ }
Invocation[] invocations = new Invocation[params.length];
- for (int i = 0; i < params.length; i++)
+ for (int i = 0; i < params.length; i++) {
invocations[i] = new Invocation(method, protocol, params[i]);
- SecureClient client = CLIENTS.getClient(conf);
- try {
- Writable[] wrappedValues =
- client.call(invocations, addrs, protocol, ticket);
+ }
- if (method.getReturnType() == Void.TYPE) {
- return null;
- }
+ Writable[] wrappedValues =
+ client.call(invocations, addrs, protocol, ticket);
+
+ if (method.getReturnType() == Void.TYPE) {
+ return null;
+ }
+
+ Object[] values =
+ (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+ for (int i = 0; i < values.length; i++)
+ if (wrappedValues[i] != null)
+ values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+
+ return values;
+ }
- Object[] values =
- (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
- for (int i = 0; i < values.length; i++)
- if (wrappedValues[i] != null)
- values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
-
- return values;
- } finally {
- CLIENTS.stopClient(client);
+ @Override
+ public void close() {
+ if (this.client != null) {
+ this.client.stop();
}
}
/** Construct a server for a protocol implementation instance listening on a
* port and address, with a secret manager. */
+ @Override
public Server getServer(Class<? extends VersionedProtocol> protocol,
final Object instance,
Class<?>[] ifaces,
Modified: hbase/branches/0.94/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon (original)
+++ hbase/branches/0.94/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon Tue Jan 29 00:50:02 2013
@@ -169,7 +169,7 @@ org.apache.hadoop.hbase.HBaseConfigurati
<%def userTables>
<%java>
HTableDescriptor[] tables = admin.listTables();
- HConnectionManager.deleteConnection(admin.getConfiguration(), false);
+ HConnectionManager.deleteConnection(admin.getConfiguration());
</%java>
<%if (tables != null && tables.length > 0)%>
<table>
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Jan 29 00:50:02 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.ipc.ExecR
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.RpcEngine;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,7 +103,7 @@ import org.apache.zookeeper.KeeperExcept
* <p>But sharing connections
* makes clean up of {@link HConnection} instances a little awkward. Currently,
* clients cleanup by calling
- * {@link #deleteConnection(Configuration, boolean)}. This will shutdown the
+ * {@link #deleteConnection(Configuration)}. This will shutdown the
* zookeeper connection the HConnection was using and clean up all
* HConnection resources as well as stopping proxies to servers out on the
* cluster. Not running the cleanup will not end the world; it'll
@@ -123,7 +124,7 @@ import org.apache.zookeeper.KeeperExcept
* }
* </pre>
* <p>Cleanup used to be done inside in a shutdown hook. On startup we'd
- * register a shutdown hook that called {@link #deleteAllConnections(boolean)}
+ * register a shutdown hook that called {@link #deleteAllConnections()}
* on its way out but the order in which shutdown hooks run is not defined so
* were problematic for clients of HConnection that wanted to register their
* own shutdown hooks so we removed ours though this shifts the onus for
@@ -183,7 +184,7 @@ public class HConnectionManager {
connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection);
} else if (connection.isClosed()) {
- HConnectionManager.deleteConnection(connectionKey, true, true);
+ HConnectionManager.deleteConnection(connectionKey, true);
connection = new HConnectionImplementation(conf, true);
HBASE_INSTANCES.put(connectionKey, connection);
}
@@ -216,13 +217,25 @@ public class HConnectionManager {
* configuration whose identity is used to find {@link HConnection}
* instance.
* @param stopProxy
- * Shuts down all the proxy's put up to cluster members including to
- * cluster HMaster. Calls
- * {@link HBaseRPC#stopProxy(org.apache.hadoop.hbase.ipc.VersionedProtocol)}
- * .
+ * No longer used. This parameter is ignored.
+ * @deprecated use {@link #createConnection(org.apache.hadoop.conf.Configuration)} instead
*/
+ @Deprecated
public static void deleteConnection(Configuration conf, boolean stopProxy) {
- deleteConnection(new HConnectionKey(conf), stopProxy, false);
+ deleteConnection(conf);
+ }
+
+ /**
+ * Delete connection information for the instance specified by configuration.
+ * If there are no more references to it, this will then close connection to
+ * the zookeeper ensemble and let go of all resources.
+ *
+ * @param conf
+ * configuration whose identity is used to find {@link HConnection}
+ * instance.
+ */
+ public static void deleteConnection(Configuration conf) {
+ deleteConnection(new HConnectionKey(conf), false);
}
/**
@@ -233,32 +246,40 @@ public class HConnectionManager {
* @param connection
*/
public static void deleteStaleConnection(HConnection connection) {
- deleteConnection(connection, true, true);
+ deleteConnection(connection, true);
}
/**
* Delete information for all connections.
- * @param stopProxy stop the proxy as well
- * @throws IOException
+ * @param stopProxy No longer used. This parameter is ignored.
+ * @deprecated use {@link #deleteAllConnections()} instead
*/
+ @Deprecated
public static void deleteAllConnections(boolean stopProxy) {
+ deleteAllConnections();
+ }
+
+ /**
+ * Delete information for all connections.
+ * @throws IOException
+ */
+ public static void deleteAllConnections() {
synchronized (HBASE_INSTANCES) {
Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
connectionKeys.addAll(HBASE_INSTANCES.keySet());
for (HConnectionKey connectionKey : connectionKeys) {
- deleteConnection(connectionKey, stopProxy, false);
+ deleteConnection(connectionKey, false);
}
HBASE_INSTANCES.clear();
}
}
- private static void deleteConnection(HConnection connection, boolean stopProxy,
- boolean staleConnection) {
+ private static void deleteConnection(HConnection connection, boolean staleConnection) {
synchronized (HBASE_INSTANCES) {
for (Entry<HConnectionKey, HConnectionImplementation> connectionEntry : HBASE_INSTANCES
.entrySet()) {
if (connectionEntry.getValue() == connection) {
- deleteConnection(connectionEntry.getKey(), stopProxy, staleConnection);
+ deleteConnection(connectionEntry.getKey(), staleConnection);
break;
}
}
@@ -266,7 +287,7 @@ public class HConnectionManager {
}
private static void deleteConnection(HConnectionKey connectionKey,
- boolean stopProxy, boolean staleConnection) {
+ boolean staleConnection) {
synchronized (HBASE_INSTANCES) {
HConnectionImplementation connection = HBASE_INSTANCES
.get(connectionKey);
@@ -274,9 +295,7 @@ public class HConnectionManager {
connection.decCount();
if (connection.isZeroReference() || staleConnection) {
HBASE_INSTANCES.remove(connectionKey);
- connection.close(stopProxy);
- } else if (stopProxy) {
- connection.stopProxyOnClose(stopProxy);
+ connection.internalClose();
}
}else {
LOG.error("Connection not found in the list, can't delete it "+
@@ -514,6 +533,9 @@ public class HConnectionManager {
private final Object resetLock = new Object();
private final Configuration conf;
+
+ private RpcEngine rpcEngine;
+
// Known region HServerAddress.toString() -> HRegionInterface
private final Map<String, HRegionInterface> servers =
@@ -541,7 +563,6 @@ public class HConnectionManager {
private final Set<Integer> regionCachePrefetchDisabledTables =
new CopyOnWriteArraySet<Integer>();
- private boolean stopProxy;
private int refCount;
// indicates whether this connection's life cycle is managed
@@ -579,6 +600,7 @@ public class HConnectionManager {
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
+ this.rpcEngine = HBaseRPC.getProtocolEngine(conf);
this.master = null;
this.resetting = false;
}
@@ -683,7 +705,7 @@ public class HConnectionManager {
}
InetSocketAddress isa =
new InetSocketAddress(sn.getHostname(), sn.getPort());
- HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
+ HMasterInterface tryMaster = rpcEngine.getProxy(
HMasterInterface.class, HMasterInterface.VERSION, isa, this.conf,
this.rpcTimeout);
@@ -1310,7 +1332,7 @@ public class HConnectionManager {
InetSocketAddress address = isa != null? isa:
new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS
- server = (HRegionInterface) HBaseRPC.waitForProxy(
+ server = HBaseRPC.waitForProxy(this.rpcEngine,
serverInterfaceClass, HRegionInterface.VERSION,
address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
@@ -1723,10 +1745,6 @@ public class HConnectionManager {
}
}
- public void stopProxyOnClose(boolean stopProxy) {
- this.stopProxy = stopProxy;
- }
-
/**
* Increment this client's reference count.
*/
@@ -1752,22 +1770,17 @@ public class HConnectionManager {
return refCount == 0;
}
- void close(boolean stopProxy) {
+ void internalClose() {
if (this.closed) {
return;
}
- if (master != null) {
- if (stopProxy) {
- HBaseRPC.stopProxy(master);
- }
- master = null;
- }
- if (stopProxy) {
- for (HRegionInterface i : servers.values()) {
- HBaseRPC.stopProxy(i);
- }
- }
+ master = null;
+
this.servers.clear();
+ if (this.rpcEngine != null) {
+ this.rpcEngine.close();
+ }
+
if (this.zooKeeper != null) {
LOG.info("Closed zookeeper sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
@@ -1782,21 +1795,21 @@ public class HConnectionManager {
if (aborted) {
HConnectionManager.deleteStaleConnection(this);
} else {
- HConnectionManager.deleteConnection(this, stopProxy, false);
+ HConnectionManager.deleteConnection(this, false);
}
} else {
- close(true);
+ internalClose();
}
if (LOG.isTraceEnabled()) LOG.debug("" + this.zooKeeper + " closed.");
}
/**
* Close the connection for good, regardless of what the current value of
- * {@link #refCount} is. Ideally, {@link refCount} should be zero at this
+ * {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
* point, which would be the case if all of its consumers close the
* connection. However, on the off chance that someone is unable to close
* the connection, perhaps because it bailed out prematurely, the method
- * below will ensure that this {@link Connection} instance is cleaned up.
+ * below will ensure that this {@link HConnection} instance is cleaned up.
* Caveat: The JVM may take an unknown amount of time to call finalize on an
* unreachable object, so our hope is that every consumer cleans up after
* itself, like any good citizen.
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Jan 29 00:50:02 2013
@@ -90,7 +90,6 @@ public class HBaseClient {
protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets
- private int refCount = 1;
protected String clusterId;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -198,31 +197,6 @@ public class HBaseClient {
return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
}
- /**
- * Increment this client's reference count
- *
- */
- synchronized void incCount() {
- refCount++;
- }
-
- /**
- * Decrement this client's reference count
- *
- */
- synchronized void decCount() {
- refCount--;
- }
-
- /**
- * Return if this client has no reference
- *
- * @return true if this client has no reference; false otherwise
- */
- synchronized boolean isZeroReference() {
- return refCount==0;
- }
-
/** A call waiting for a value. */
protected class Call {
final int id; // call id
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Jan 29 00:50:02 2013
@@ -83,14 +83,6 @@ public class HBaseRPC {
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
- // cache of RpcEngines by protocol
- private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
- = new HashMap<Class,RpcEngine>();
-
- // track what RpcEngine is used by a proxy class, for stopProxy()
- private static final Map<Class,RpcEngine> PROXY_ENGINES
- = new HashMap<Class,RpcEngine>();
-
// thread-specific RPC timeout, which may override that of RpcEngine
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override
@@ -99,38 +91,17 @@ public class HBaseRPC {
}
};
- // set a protocol to use a non-default RpcEngine
- static void setProtocolEngine(Configuration conf,
- Class protocol, Class engine) {
- conf.setClass(RPC_ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
- }
-
- // return the RpcEngine configured to handle a protocol
- private static synchronized RpcEngine getProtocolEngine(Class protocol,
- Configuration conf) {
- RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
- if (engine == null) {
- // check for a configured default engine
- Class<?> defaultEngine =
- conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);
-
- // check for a per interface override
- Class<?> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
- defaultEngine);
- LOG.debug("Using "+impl.getName()+" for "+protocol.getName());
- engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
- if (protocol.isInterface())
- PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
- protocol),
- engine);
- PROTOCOL_ENGINES.put(protocol, engine);
- }
- return engine;
- }
+ /**
+ * Returns a new instance of the configured {@link RpcEngine} implementation.
+ */
+ public static synchronized RpcEngine getProtocolEngine(Configuration conf) {
+ // check for a configured default engine
+ Class<?> impl =
+ conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);
- // return the RpcEngine that handles a proxy object
- private static synchronized RpcEngine getProxyEngine(Object proxy) {
- return PROXY_ENGINES.get(proxy.getClass());
+ LOG.debug("Using RpcEngine: "+impl.getName());
+ RpcEngine engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
+ return engine;
}
/**
@@ -219,21 +190,22 @@ public class HBaseRPC {
* @throws IOException e
*/
@SuppressWarnings("unchecked")
- public static VersionedProtocol waitForProxy(Class protocol,
+ public static <T extends VersionedProtocol> T waitForProxy(RpcEngine rpcClient,
+ Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
int rpcTimeout,
long timeout
- ) throws IOException {
+ ) throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
long startTime = System.currentTimeMillis();
IOException ioe;
int reconnectAttempts = 0;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
+ return rpcClient.getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch(SocketTimeoutException te) { // namenode is busy
LOG.info("Problem connecting to server: " + addr);
ioe = te;
@@ -294,88 +266,6 @@ public class HBaseRPC {
}
/**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- *
- * @param protocol interface
- * @param clientVersion version we are expecting
- * @param addr remote address
- * @param conf configuration
- * @param factory socket factory
- * @param rpcTimeout timeout for each RPC
- * @return proxy
- * @throws IOException e
- */
- public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
- long clientVersion, InetSocketAddress addr, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException {
- return getProxy(protocol, clientVersion, addr,
- User.getCurrent(), conf, factory, rpcTimeout);
- }
-
- /**
- * Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address.
- *
- * @param protocol interface
- * @param clientVersion version we are expecting
- * @param addr remote address
- * @param ticket ticket
- * @param conf configuration
- * @param factory socket factory
- * @param rpcTimeout timeout for each RPC
- * @return proxy
- * @throws IOException e
- */
- public static VersionedProtocol getProxy(
- Class<? extends VersionedProtocol> protocol,
- long clientVersion, InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout)
- throws IOException {
- VersionedProtocol proxy =
- getProtocolEngine(protocol,conf)
- .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- }
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
-
- /**
- * Construct a client-side proxy object with the default SocketFactory
- *
- * @param protocol interface
- * @param clientVersion version we are expecting
- * @param addr remote address
- * @param conf configuration
- * @param rpcTimeout timeout for each RPC
- * @return a proxy instance
- * @throws IOException e
- */
- public static VersionedProtocol getProxy(
- Class<? extends VersionedProtocol> protocol,
- long clientVersion, InetSocketAddress addr, Configuration conf,
- int rpcTimeout)
- throws IOException {
-
- return getProxy(protocol, clientVersion, addr, conf, NetUtils
- .getDefaultSocketFactory(conf), rpcTimeout);
- }
-
- /**
- * Stop this proxy and release its invoker's resource
- * @param proxy the proxy to be stopped
- */
- public static void stopProxy(VersionedProtocol proxy) {
- if (proxy!=null) {
- getProxyEngine(proxy).stopProxy(proxy);
- }
- }
-
- /**
* Expert: Make multiple, parallel calls to a set of servers.
*
* @param method method to invoke
@@ -385,7 +275,7 @@ public class HBaseRPC {
* @return values
* @throws IOException e
* @deprecated Instead of calling statically, use
- * {@link HBaseRPC#getProtocolEngine(Class, org.apache.hadoop.conf.Configuration)}
+ * {@link HBaseRPC#getProtocolEngine(org.apache.hadoop.conf.Configuration)}
* to obtain an {@link RpcEngine} instance and then use
* {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)}
*/
@@ -396,8 +286,15 @@ public class HBaseRPC {
User ticket,
Configuration conf)
throws IOException, InterruptedException {
- return getProtocolEngine(protocol, conf)
- .call(method, params, addrs, protocol, ticket, conf);
+ Object[] result = null;
+ RpcEngine engine = null;
+ try {
+ engine = getProtocolEngine(conf);
+ result = engine.call(method, params, addrs, protocol, ticket, conf);
+ } finally {
+ engine.close();
+ }
+ return result;
}
/**
@@ -430,7 +327,7 @@ public class HBaseRPC {
final int numHandlers,
int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException {
- return getProtocolEngine(protocol, conf)
+ return getProtocolEngine(conf)
.getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
}
@@ -445,4 +342,12 @@ public class HBaseRPC {
public static void resetRpcTimeout() {
HBaseRPC.rpcTimeout.remove();
}
+
+ /**
+ * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
+ * default timeout.
+ */
+ public static int getRpcTimeout(int defaultTimeout) {
+ return Math.min(defaultTimeout, HBaseRPC.rpcTimeout.get());
+ }
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java Tue Jan 29 00:50:02 2013
@@ -22,23 +22,24 @@ package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Method;
import java.io.IOException;
import java.net.InetSocketAddress;
-import javax.net.SocketFactory;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.conf.Configuration;
/** An RPC implementation. */
-interface RpcEngine {
+@InterfaceAudience.Private
+public interface RpcEngine extends Configurable {
+ /* Client-related methods */
/** Construct a client-side proxy object. */
- VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
- long clientVersion, InetSocketAddress addr,
- User ticket, Configuration conf,
- SocketFactory factory, int rpcTimeout) throws IOException;
+ <T extends VersionedProtocol> T getProxy(Class<T> protocol,
+ long clientVersion, InetSocketAddress addr,
+ Configuration conf, int rpcTimeout) throws IOException;
- /** Stop this proxy. */
- void stopProxy(VersionedProtocol proxy);
+ /** Shutdown this instance */
+ void close();
/** Expert: Make multiple, parallel calls to a set of servers. */
Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
@@ -46,6 +47,7 @@ interface RpcEngine {
User ticket, Configuration conf)
throws IOException, InterruptedException;
+ /* Server-related methods */
/** Construct a server for a protocol implementation instance. */
RpcServer getServer(Class<? extends VersionedProtocol> protocol, Object instance,
Class<?>[] ifaces, String bindAddress,
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Tue Jan 29 00:50:02 2013
@@ -57,85 +57,21 @@ class WritableRpcEngine implements RpcEn
// DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
- /* Cache a client using its socket factory as the hash key */
- static private class ClientCache {
- private Map<SocketFactory, HBaseClient> clients =
- new HashMap<SocketFactory, HBaseClient>();
-
- protected ClientCache() {}
-
- /**
- * Construct & cache an IPC client with the user-provided SocketFactory
- * if no cached client exists.
- *
- * @param conf Configuration
- * @param factory socket factory
- * @return an IPC client
- */
- protected synchronized HBaseClient getClient(Configuration conf,
- SocketFactory factory) {
- // Construct & cache client. The configuration is only used for timeout,
- // and Clients have connection pools. So we can either (a) lose some
- // connection pooling and leak sockets, or (b) use the same timeout for
- // all configurations. Since the IPC is usually intended globally, not
- // per-job, we choose (a).
- HBaseClient client = clients.get(factory);
- if (client == null) {
- // Make an hbase client instead of hadoop Client.
- client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
- clients.put(factory, client);
- } else {
- client.incCount();
- }
- return client;
- }
-
- /**
- * Construct & cache an IPC client with the default SocketFactory
- * if no cached client exists.
- *
- * @param conf Configuration
- * @return an IPC client
- */
- protected synchronized HBaseClient getClient(Configuration conf) {
- return getClient(conf, SocketFactory.getDefault());
- }
-
- /**
- * Stop a RPC client connection
- * A RPC client is closed only when its reference count becomes zero.
- * @param client client to stop
- */
- protected void stopClient(HBaseClient client) {
- synchronized (this) {
- client.decCount();
- if (client.isZeroReference()) {
- clients.remove(client.getSocketFactory());
- }
- }
- if (client.isZeroReference()) {
- client.stop();
- }
- }
- }
-
- protected final static ClientCache CLIENTS = new ClientCache();
-
private static class Invoker implements InvocationHandler {
private Class<? extends VersionedProtocol> protocol;
private InetSocketAddress address;
private User ticket;
private HBaseClient client;
- private boolean isClosed = false;
final private int rpcTimeout;
- public Invoker(Class<? extends VersionedProtocol> protocol,
+ public Invoker(HBaseClient client,
+ Class<? extends VersionedProtocol> protocol,
InetSocketAddress address, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout) {
+ Configuration conf, int rpcTimeout) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory);
+ this.client = client;
this.rpcTimeout = rpcTimeout;
}
@@ -157,78 +93,98 @@ class WritableRpcEngine implements RpcEn
}
return value.get();
}
+ }
- /* close the IPC client that's responsible for this invoker's RPCs */
- synchronized protected void close() {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
+ private Configuration conf;
+ private HBaseClient client;
+
+ @Override
+ public void setConf(Configuration config) {
+ this.conf = config;
+ // check for an already created client
+ if (this.client != null) {
+ this.client.stop();
}
+ this.client = new HBaseClient(HbaseObjectWritable.class, conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public VersionedProtocol getProxy(
- Class<? extends VersionedProtocol> protocol, long clientVersion,
- InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout)
+ @Override
+ public <T extends VersionedProtocol> T getProxy(
+ Class<T> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf, int rpcTimeout)
throws IOException {
+ if (this.client == null) {
+ throw new IOException("Client must be initialized by calling setConf(Configuration)");
+ }
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
+ T proxy =
+ (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- if (proxy instanceof VersionedProtocol) {
- long serverVersion = ((VersionedProtocol)proxy)
- .getProtocolVersion(protocol.getName(), clientVersion);
- if (serverVersion != clientVersion) {
- throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
+ new Invoker(client, protocol, addr, User.getCurrent(), conf,
+ HBaseRPC.getRpcTimeout(rpcTimeout)));
+
+ /*
+ * TODO: checking protocol version only needs to be done once when we setup a new
+ * HBaseClient.Connection. Doing it every time we retrieve a proxy instance is resulting
+ * in unnecessary RPC traffic.
+ */
+ long serverVersion = ((VersionedProtocol)proxy)
+ .getProtocolVersion(protocol.getName(), clientVersion);
+ if (serverVersion != clientVersion) {
+ throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
}
+
return proxy;
}
- /**
- * Stop this proxy and release its invoker's resource
- * @param proxy the proxy to be stopped
- */
- public void stopProxy(VersionedProtocol proxy) {
- if (proxy!=null) {
- ((Invoker)Proxy.getInvocationHandler(proxy)).close();
- }
- }
/** Expert: Make multiple, parallel calls to a set of servers. */
+ @Override
public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs,
Class<? extends VersionedProtocol> protocol,
User ticket, Configuration conf)
throws IOException, InterruptedException {
+ if (this.client == null) {
+ throw new IOException("Client must be initialized by calling setConf(Configuration)");
+ }
Invocation[] invocations = new Invocation[params.length];
- for (int i = 0; i < params.length; i++)
+ for (int i = 0; i < params.length; i++) {
invocations[i] = new Invocation(method, protocol, params[i]);
- HBaseClient client = CLIENTS.getClient(conf);
- try {
+ }
+
Writable[] wrappedValues =
- client.call(invocations, addrs, protocol, ticket);
+ client.call(invocations, addrs, protocol, ticket);
if (method.getReturnType() == Void.TYPE) {
return null;
}
Object[] values =
- (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
- for (int i = 0; i < values.length; i++)
- if (wrappedValues[i] != null)
+ (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+ for (int i = 0; i < values.length; i++) {
+ if (wrappedValues[i] != null) {
values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+ }
+ }
return values;
- } finally {
- CLIENTS.stopClient(client);
+ }
+
+ @Override
+ public void close() {
+ if (this.client != null) {
+ this.client.stop();
}
}
@@ -428,7 +384,7 @@ class WritableRpcEngine implements RpcEn
* client Operations.
* @param call The call to log.
* @param tag The tag that will be used to indicate this event in the log.
- * @param client The address of the client who made this call.
+ * @param clientAddress The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms.
* @param processingTime The duration that the call took to run, in ms.
* @param qTime The duration that the call spent on the queue
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jan 29 00:50:02 2013
@@ -123,6 +123,7 @@ import org.apache.hadoop.hbase.ipc.HMast
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.Invocation;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
+import org.apache.hadoop.hbase.ipc.RpcEngine;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -234,6 +235,9 @@ public class HRegionServer implements HR
// Remote HMaster
private HMasterRegionInterface hbaseMaster;
+ // RPC Engine for master connection
+ private RpcEngine rpcEngine;
+
// Server to handle client requests. Default access so can be accessed by
// unit tests.
RpcServer rpcServer;
@@ -587,6 +591,8 @@ public class HRegionServer implements HR
for (int i = 0; i < nbBlocks; i++) {
reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
}
+
+ this.rpcEngine = HBaseRPC.getProtocolEngine(conf);
} catch (Throwable t) {
// Call stop if error or process will stick around for ever since server
// puts up non-daemon threads.
@@ -828,10 +834,8 @@ public class HRegionServer implements HR
}
// Make sure the proxy is down.
- if (this.hbaseMaster != null) {
- HBaseRPC.stopProxy(this.hbaseMaster);
- this.hbaseMaster = null;
- }
+ this.hbaseMaster = null;
+ this.rpcEngine.close();
this.leases.close();
if (!killed) {
@@ -1878,7 +1882,7 @@ public class HRegionServer implements HR
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
- master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
+ master = HBaseRPC.waitForProxy(this.rpcEngine,
HMasterRegionInterface.class, HMasterRegionInterface.VERSION,
masterIsa, this.conf, -1,
this.rpcTimeout, this.rpcTimeout);
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Tue Jan 29 00:50:02 2013
@@ -151,7 +151,7 @@ public class ReplicationLogCleaner exten
this.zkHelper.getZookeeperWatcher().close();
}
// Not sure why we're deleting a connection that we never acquired or used
- HConnectionManager.deleteConnection(this.getConf(), true);
+ HConnectionManager.deleteConnection(this.getConf());
}
@Override
Modified: hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp (original)
+++ hbase/branches/0.94/src/main/resources/hbase-webapps/master/table.jsp Tue Jan 29 00:50:02 2013
@@ -235,7 +235,7 @@
}
} // end else
-HConnectionManager.deleteConnection(hbadmin.getConfiguration(), false);
+HConnectionManager.deleteConnection(hbadmin.getConfiguration());
%>
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Tue Jan 29 00:50:02 2013
@@ -491,7 +491,7 @@ public class MiniHBaseCluster extends HB
if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown();
}
- HConnectionManager.deleteAllConnections(false);
+ HConnectionManager.deleteAllConnections();
}
@Override
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Tue Jan 29 00:50:02 2013
@@ -163,7 +163,7 @@ public class TestCatalogTracker {
// Join the thread... should exit shortly.
t.join();
} finally {
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -237,7 +237,7 @@ public class TestCatalogTracker {
}
} finally {
// Clear out our doctored connection or could mess up subsequent tests.
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -264,7 +264,7 @@ public class TestCatalogTracker {
}
} finally {
// Clear out our doctored connection or could mess up subsequent tests.
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -347,7 +347,7 @@ public class TestCatalogTracker {
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
ct.waitForMeta(100);
} finally {
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
@@ -437,20 +437,20 @@ public class TestCatalogTracker {
// Now meta is available.
Assert.assertTrue(ct.waitForMeta(10000).equals(SN));
} finally {
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
}
}
/**
* @param implementation An {@link HRegionInterface} instance; you'll likely
* want to pass a mocked HRS; can be null.
- * @return Mock up a connection that returns a {@link Configuration} when
+ * @return Mock up a connection that returns a {@link org.apache.hadoop.conf.Configuration} when
* {@link HConnection#getConfiguration()} is called, a 'location' when
* {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
* and that returns the passed {@link HRegionInterface} instance when
* {@link HConnection#getHRegionConnection(String, int)}
* is called (Be sure call
- * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)}
+ * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration)}
* when done with this mocked Connection.
* @throws IOException
*/
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Tue Jan 29 00:50:02 2013
@@ -166,7 +166,7 @@ public class TestMetaReaderEditorNoClust
openScanner((byte [])Mockito.any(), (Scan)Mockito.any());
} finally {
if (ct != null) ct.stop();
- HConnectionManager.deleteConnection(UTIL.getConfiguration(), true);
+ HConnectionManager.deleteConnection(UTIL.getConfiguration());
zkw.close();
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Tue Jan 29 00:50:02 2013
@@ -193,7 +193,7 @@ public class TestHCM {
}
/**
- * Make sure that {@link HConfiguration} instances that are essentially the
+ * Make sure that {@link Configuration} instances that are essentially the
* same map to the same {@link HConnection} instance.
*/
@Test
@@ -267,7 +267,7 @@ public class TestHCM {
} finally {
for (HConnection c: connections) {
// Clean up connections made so we don't interfere w/ subsequent tests.
- HConnectionManager.deleteConnection(c.getConfiguration(), true);
+ HConnectionManager.deleteConnection(c.getConfiguration());
}
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Tue Jan 29 00:50:02 2013
@@ -70,30 +70,38 @@ public class TestDelayedRpc {
rpcServer = HBaseRPC.getServer(new TestRpcImpl(delayReturnValue),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
- rpcServer.start();
+ RpcEngine rpcEngine = null;
+ try {
+ rpcServer.start();
+ rpcEngine = HBaseRPC.getProtocolEngine(conf);
- TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
+ TestRpc client = rpcEngine.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 1000);
- List<Integer> results = new ArrayList<Integer>();
+ List<Integer> results = new ArrayList<Integer>();
- TestThread th1 = new TestThread(client, true, results);
- TestThread th2 = new TestThread(client, false, results);
- TestThread th3 = new TestThread(client, false, results);
- th1.start();
- Thread.sleep(100);
- th2.start();
- Thread.sleep(200);
- th3.start();
-
- th1.join();
- th2.join();
- th3.join();
-
- assertEquals(UNDELAYED, results.get(0).intValue());
- assertEquals(UNDELAYED, results.get(1).intValue());
- assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
- 0xDEADBEEF);
+ TestThread th1 = new TestThread(client, true, results);
+ TestThread th2 = new TestThread(client, false, results);
+ TestThread th3 = new TestThread(client, false, results);
+ th1.start();
+ Thread.sleep(100);
+ th2.start();
+ Thread.sleep(200);
+ th3.start();
+
+ th1.join();
+ th2.join();
+ th3.join();
+
+ assertEquals(UNDELAYED, results.get(0).intValue());
+ assertEquals(UNDELAYED, results.get(1).intValue());
+ assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
+ 0xDEADBEEF);
+ } finally {
+ if (rpcEngine != null) {
+ rpcEngine.close();
+ }
+ }
}
private static class ListAppender extends AppenderSkeleton {
@@ -133,33 +141,42 @@ public class TestDelayedRpc {
rpcServer = HBaseRPC.getServer(new TestRpcImpl(true),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
- rpcServer.start();
- TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
- rpcServer.getListenerAddress(), conf, 1000);
+ RpcEngine rpcEngine = null;
+ try {
+ rpcServer.start();
+ rpcEngine = HBaseRPC.getProtocolEngine(conf);
- Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
+ TestRpc client = rpcEngine.getProxy(TestRpc.class, 0,
+ rpcServer.getListenerAddress(), conf, 1000);
- for (int i = 0; i < MAX_DELAYED_RPC; i++) {
- threads[i] = new TestThread(client, true, null);
- threads[i].start();
- }
+ Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
- /* No warnings till here. */
- assertTrue(listAppender.getMessages().isEmpty());
+ for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+ threads[i] = new TestThread(client, true, null);
+ threads[i].start();
+ }
- /* This should give a warning. */
- threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
- threads[MAX_DELAYED_RPC].start();
+ /* No warnings till here. */
+ assertTrue(listAppender.getMessages().isEmpty());
- for (int i = 0; i < MAX_DELAYED_RPC; i++) {
- threads[i].join();
- }
+ /* This should give a warning. */
+ threads[MAX_DELAYED_RPC] = new TestThread(client, true, null);
+ threads[MAX_DELAYED_RPC].start();
- assertFalse(listAppender.getMessages().isEmpty());
- assertTrue(listAppender.getMessages().get(0).startsWith(
- "Too many delayed calls"));
+ for (int i = 0; i < MAX_DELAYED_RPC; i++) {
+ threads[i].join();
+ }
- log.removeAppender(listAppender);
+ assertFalse(listAppender.getMessages().isEmpty());
+ assertTrue(listAppender.getMessages().get(0).startsWith(
+ "Too many delayed calls"));
+
+ log.removeAppender(listAppender);
+ } finally {
+ if (rpcEngine != null) {
+ rpcEngine.close();
+ }
+ }
}
public interface TestRpc extends VersionedProtocol {
@@ -177,7 +194,6 @@ public class TestDelayedRpc {
/**
* @param delayReturnValue Should the response to the delayed call be set
* at the start or the end of the delay.
- * @param delay Amount of milliseconds to delay the call by
*/
public TestRpcImpl(boolean delayReturnValue) {
this.delayReturnValue = delayReturnValue;
@@ -256,30 +272,38 @@ public class TestDelayedRpc {
rpcServer = HBaseRPC.getServer(new FaultyTestRpc(),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
- rpcServer.start();
+ RpcEngine rpcEngine = null;
+ try {
+ rpcServer.start();
+ rpcEngine = HBaseRPC.getProtocolEngine(conf);
- TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
- rpcServer.getListenerAddress(), conf, 1000);
+ TestRpc client = rpcEngine.getProxy(TestRpc.class, 0,
+ rpcServer.getListenerAddress(), conf, 1000);
- int result = 0xDEADBEEF;
+ int result = 0xDEADBEEF;
- try {
- result = client.test(false);
- } catch (Exception e) {
- fail("No exception should have been thrown.");
- }
- assertEquals(result, UNDELAYED);
+ try {
+ result = client.test(false);
+ } catch (Exception e) {
+ fail("No exception should have been thrown.");
+ }
+ assertEquals(result, UNDELAYED);
- boolean caughtException = false;
- try {
- result = client.test(true);
- } catch(Exception e) {
- // Exception thrown by server is enclosed in a RemoteException.
- if (e.getCause().getMessage().startsWith(
- "java.lang.Exception: Something went wrong"))
- caughtException = true;
+ boolean caughtException = false;
+ try {
+ result = client.test(true);
+ } catch(Exception e) {
+ // Exception thrown by server is enclosed in a RemoteException.
+ if (e.getCause().getMessage().startsWith(
+ "java.lang.Exception: Something went wrong"))
+ caughtException = true;
+ }
+ assertTrue(caughtException);
+ } finally {
+ if (rpcEngine != null) {
+ rpcEngine.close();
+ }
}
- assertTrue(caughtException);
}
/**
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java Tue Jan 29 00:50:02 2013
@@ -83,18 +83,18 @@ public class TestPBOnWritableRpc {
RpcServer rpcServer = HBaseRPC.getServer(new TestImpl(),
new Class<?>[] {TestProtocol.class},
"localhost", // BindAddress is IP we got for this server.
- 9999, // port number
+ 0, // port number
2, // number of handlers
0, // we dont use high priority handlers in master
conf.getBoolean("hbase.rpc.verbose", false), conf,
0);
- TestProtocol proxy = null;
+ RpcEngine rpcEngine = null;
try {
rpcServer.start();
+ rpcEngine = HBaseRPC.getProtocolEngine(conf);
- InetSocketAddress isa =
- new InetSocketAddress("localhost", 9999);
- proxy = (TestProtocol) HBaseRPC.waitForProxy(
+ InetSocketAddress isa = rpcServer.getListenerAddress();
+ TestProtocol proxy = HBaseRPC.waitForProxy(rpcEngine,
TestProtocol.class, TestProtocol.VERSION,
isa, conf, -1, 8000, 8000);
@@ -118,8 +118,8 @@ public class TestPBOnWritableRpc {
assertNotSame(sendProto, retProto);
} finally {
rpcServer.stop();
- if(proxy != null) {
- HBaseRPC.stopProxy(proxy);
+ if (rpcEngine != null) {
+ rpcEngine.close();
}
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/ipc/TestProtocolExtension.java Tue Jan 29 00:50:02 2013
@@ -80,12 +80,13 @@ public class TestProtocolExtension {
6016,
10, 10, false,
conf, 10);
- TestProtocol proxy = null;
+ RpcEngine rpcEngine = null;
try {
server.start();
+ rpcEngine = HBaseRPC.getProtocolEngine(conf);
InetSocketAddress addr = server.getListenerAddress();
- proxy = (TestProtocol)HBaseRPC.getProxy(
+ TestProtocol proxy = rpcEngine.getProxy(
TestProtocol.class, TestProtocol.VERSION, addr, conf, 10000);
proxy.ping();
@@ -93,7 +94,9 @@ public class TestProtocolExtension {
proxy.logClassName();
} finally {
server.stop();
- if(proxy!=null) HBaseRPC.stopProxy(proxy);
+ if (rpcEngine != null) {
+ rpcEngine.close();
+ }
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Tue Jan 29 00:50:02 2013
@@ -145,7 +145,7 @@ public class TestCatalogJanitor {
this.ct.stop();
}
if (this.connection != null) {
- HConnectionManager.deleteConnection(this.connection.getConfiguration(), true);
+ HConnectionManager.deleteConnection(this.connection.getConfiguration());
}
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java Tue Jan 29 00:50:02 2013
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.master;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -29,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.RpcEngine;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -47,9 +49,11 @@ public class TestHMasterRPCException {
ServerName sm = hm.getServerName();
InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort());
+ RpcEngine rpcEngine = null;
try {
- HMasterInterface inf = (HMasterInterface) HBaseRPC.getProxy(
- HMasterInterface.class, HMasterInterface.VERSION, isa, conf, 100 * 10);
+ rpcEngine = HBaseRPC.getProtocolEngine(conf);
+ HMasterInterface inf = rpcEngine.getProxy(
+ HMasterInterface.class, HMasterInterface.VERSION, isa, conf, 100 * 10);
inf.isMasterRunning();
fail();
} catch (RemoteException ex) {
@@ -57,6 +61,10 @@ public class TestHMasterRPCException {
"org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet"));
} catch (Throwable t) {
fail("Unexpected throwable: " + t);
+ } finally {
+ if (rpcEngine != null) {
+ rpcEngine.close();
+ }
}
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java Tue Jan 29 00:50:02 2013
@@ -102,7 +102,7 @@ public class OfflineMetaRebuildTestCore
@After
public void tearDownAfter() throws Exception {
TEST_UTIL.shutdownMiniCluster();
- HConnectionManager.deleteConnection(conf, true);
+ HConnectionManager.deleteConnection(conf);
}
/**
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java?rev=1439723&r1=1439722&r2=1439723&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/hbck/TestOfflineMetaRebuildBase.java Tue Jan 29 00:50:02 2013
@@ -57,7 +57,7 @@ public class TestOfflineMetaRebuildBase
// shutdown the minicluster
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniZKCluster();
- HConnectionManager.deleteConnection(conf, false);
+ HConnectionManager.deleteConnection(conf);
// rebuild meta table from scratch
HBaseFsck fsck = new HBaseFsck(conf);