You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/05/15 09:05:26 UTC
svn commit: r775039 - in
/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop:
hbase/client/ hbase/ipc/ hbase/master/ hbase/regionserver/ ipc/
Author: apurtell
Date: Fri May 15 07:05:26 2009
New Revision: 775039
URL: http://svn.apache.org/viewvc?rev=775039&view=rev
Log:
roll back RPC bits to 0.18 version, then bring forward just a bit with versioned interface, error handler, and HBaseObjectWritable
Added:
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java
Removed:
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified:
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri May 15 07:05:26 2009
@@ -777,7 +777,7 @@
server = (HRegionInterface)HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
- this.maxRPCAttempts, this.rpcTimeout);
+ this.maxRPCAttempts);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Fri May 15 07:05:26 2009
@@ -21,7 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -29,20 +28,9 @@
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
import java.net.SocketTimeoutException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Map;
import javax.net.SocketFactory;
@@ -53,11 +41,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.HBaseClient;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
/** A simple RPC mechanism.
*
@@ -88,33 +79,14 @@
// Leave this out in the hadoop ipc package but keep class name. Do this
// so that we dont' get the logging of this class's invocations by doing our
// blanket enabling DEBUG on the o.a.h.h. package.
- protected static final Log LOG =
+ private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
- private HBaseRPC() {
- super();
- } // no public ctor
+ private HBaseRPC() {} // no public ctor
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
- // Here, for hbase, we maintain two static maps of method names to code and
- // vice versa.
- private static final Map<Byte, String> CODE_TO_METHODNAME =
- new HashMap<Byte, String>();
- private static final Map<String, Byte> METHODNAME_TO_CODE =
- new HashMap<String, Byte>();
- // Special code that means 'not-encoded'.
- private static final byte NOT_ENCODED = 0;
- static {
- byte code = NOT_ENCODED + 1;
- code = addToMap(VersionedProtocol.class, code);
- code = addToMap(HMasterInterface.class, code);
- code = addToMap(HMasterRegionInterface.class, code);
- code = addToMap(TransactionalRegionInterface.class, code);
- }
- // End of hbase modifications.
-
private String methodName;
@SuppressWarnings("unchecked")
private Class[] parameterClasses;
@@ -122,9 +94,7 @@
private Configuration conf;
/** default constructor */
- public Invocation() {
- super();
- }
+ public Invocation() {}
/**
* @param method
@@ -147,20 +117,18 @@
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
- byte code = in.readByte();
- methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
+ methodName = Text.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
HbaseObjectWritable objectWritable = new HbaseObjectWritable();
for (int i = 0; i < parameters.length; i++) {
- parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
- this.conf);
+ parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
- writeMethodNameCode(out, this.methodName);
+ Text.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -170,7 +138,7 @@
@Override
public String toString() {
- StringBuilder buffer = new StringBuilder(256);
+ StringBuffer buffer = new StringBuffer();
buffer.append(methodName);
buffer.append("(");
for (int i = 0; i < parameters.length; i++) {
@@ -189,64 +157,13 @@
public Configuration getConf() {
return this.conf;
}
-
- // Hbase additions.
- private static void addToMap(final String name, final byte code) {
- if (METHODNAME_TO_CODE.containsKey(name)) {
- return;
- }
- METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
- CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
- }
-
- /*
- * @param c Class whose methods we'll add to the map of methods to codes
- * (and vice versa).
- * @param code Current state of the byte code.
- * @return State of <code>code</code> when this method is done.
- */
- private static byte addToMap(final Class<?> c, final byte code) {
- byte localCode = code;
- Method [] methods = c.getMethods();
- // There are no guarantees about the order in which items are returned in
- // so do a sort (Was seeing that sort was one way on one server and then
- // another on different server).
- Arrays.sort(methods, new Comparator<Method>() {
- public int compare(Method left, Method right) {
- return left.getName().compareTo(right.getName());
- }
- });
- for (int i = 0; i < methods.length; i++) {
- addToMap(methods[i].getName(), localCode++);
- }
- return localCode;
- }
- /*
- * Write out the code byte for passed Class.
- * @param out
- * @param c
- * @throws IOException
- */
- static void writeMethodNameCode(final DataOutput out, final String methodname)
- throws IOException {
- Byte code = METHODNAME_TO_CODE.get(methodname);
- if (code == null) {
- LOG.error("Unsupported type " + methodname);
- throw new UnsupportedOperationException("No code for unexpected " +
- methodname);
- }
- out.writeByte(code.byteValue());
- }
- // End of hbase additions.
}
/* Cache a client using its socket factory as the hash key */
static private class ClientCache {
- private Map<SocketFactory, HBaseClient> clients =
- new HashMap<SocketFactory, HBaseClient>();
-
- protected ClientCache() {}
+ private Map<SocketFactory, Client> clients =
+ new HashMap<SocketFactory, Client>();
/**
* Construct & cache an IPC client with the user-provided SocketFactory
@@ -255,20 +172,19 @@
* @param conf Configuration
* @return an IPC client
*/
- protected synchronized HBaseClient getClient(Configuration conf,
+ private synchronized Client getClient(Configuration conf,
SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
- HBaseClient client = clients.get(factory);
+ Client client = clients.get(factory);
if (client == null) {
- // Make an hbase client instead of hadoop Client.
client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
- client.incCount();
+ ((HBaseClient)client).incCount();
}
return client;
}
@@ -280,7 +196,7 @@
* @param conf Configuration
* @return an IPC client
*/
- protected synchronized HBaseClient getClient(Configuration conf) {
+ private synchronized Client getClient(Configuration conf) {
return getClient(conf, SocketFactory.getDefault());
}
@@ -288,25 +204,25 @@
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
*/
- protected void stopClient(HBaseClient client) {
+ private void stopClient(Client client) {
synchronized (this) {
- client.decCount();
- if (client.isZeroReference()) {
- clients.remove(client.getSocketFactory());
+ ((HBaseClient)client).decCount();
+ if (((HBaseClient)client).isZeroReference()) {
+ clients.remove(((HBaseClient)client).getSocketFactory());
}
}
- if (client.isZeroReference()) {
+ if (((HBaseClient)client).isZeroReference()) {
client.stop();
}
}
}
- protected final static ClientCache CLIENTS = new ClientCache();
+ private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private UserGroupInformation ticket;
- private HBaseClient client;
+ private Client client;
private boolean isClosed = false;
/**
@@ -324,22 +240,16 @@
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- final boolean logDebug = LOG.isDebugEnabled();
- long startTime = 0;
- if (logDebug) {
- startTime = System.currentTimeMillis();
- }
+ long startTime = System.currentTimeMillis();
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket);
- if (logDebug) {
- long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
- }
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
- synchronized protected void close() {
+ synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
@@ -409,19 +319,14 @@
long clientVersion,
InetSocketAddress addr,
Configuration conf,
- int maxAttempts,
- long timeout
+ int maxAttempts
) throws IOException {
- // HBase does limited number of reconnects which is different from hadoop.
- long startTime = System.currentTimeMillis();
- IOException ioe;
int reconnectAttempts = 0;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf);
} catch(ConnectException se) { // namenode has not been started
LOG.info("Server at " + addr + " not available yet, Zzzzz...");
- ioe = se;
if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
LOG.info("Server at " + addr + " could not be reached after " +
reconnectAttempts + " tries, giving up.");
@@ -431,14 +336,7 @@
}
} catch(SocketTimeoutException te) { // namenode is busy
LOG.info("Problem connecting to server: " + addr);
- ioe = te;
}
- // check if timed out
- if (System.currentTimeMillis()-timeout >= startTime) {
- throw ioe;
- }
-
- // wait for retry
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
@@ -480,8 +378,8 @@
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory)
- throws IOException {
+ Configuration conf, SocketFactory factory) throws IOException {
+
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
@@ -490,9 +388,10 @@
clientVersion);
if (serverVersion == clientVersion) {
return proxy;
+ } else {
+ throw new VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
}
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
}
/**
@@ -540,7 +439,7 @@
Invocation[] invocations = new Invocation[params.length];
for (int i = 0; i < params.length; i++)
invocations[i] = new Invocation(method, params[i]);
- HBaseClient client = CLIENTS.getClient(conf);
+ Client client = CLIENTS.getClient(conf);
try {
Writable[] wrappedValues = client.call(invocations, addrs);
@@ -597,10 +496,11 @@
}
/** An RPC Server. */
- public static class Server extends HBaseServer {
+ public static class Server extends org.apache.hadoop.ipc.Server {
private Object instance;
private Class<?> implementation;
private boolean verbose;
+ private HBaseRPCErrorHandler handler;
/**
* Construct an RPC server.
@@ -645,6 +545,7 @@
try {
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
+
Method method =
implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
@@ -653,16 +554,23 @@
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receivedTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Served: " + call.getMethodName() +
+ LOG.debug("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
- rpcMetrics.rpcQueueTime.inc(qTime);
- rpcMetrics.rpcProcessingTime.inc(processingTime);
- }
rpcMetrics.rpcQueueTime.inc(qTime);
rpcMetrics.rpcProcessingTime.inc(processingTime);
- rpcMetrics.inc(call.getMethodName(), processingTime);
+
+ MetricsTimeVaryingRate m = rpcMetrics.metricsList.get(call.getMethodName());
+
+ if (m != null) {
+ m.inc(processingTime);
+ } else {
+ rpcMetrics.metricsList.put(call.getMethodName(),
+ new MetricsTimeVaryingRate(call.getMethodName()));
+ m = rpcMetrics.metricsList.get(call.getMethodName());
+ m.inc(processingTime);
+ }
+
if (verbose) log("Return: "+value);
return new HbaseObjectWritable(method.getReturnType(), value);
@@ -671,9 +579,19 @@
Throwable target = e.getTargetException();
if (target instanceof IOException) {
throw (IOException)target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ }
+ } catch (OutOfMemoryError e) {
+ if (handler != null) {
+ if (handler.checkOOME(e)) {
+ this.stop();
+ }
}
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
throw ioe;
} catch (Throwable e) {
IOException ioe = new IOException(e.toString());
@@ -681,283 +599,15 @@
throw ioe;
}
}
- }
- protected static void log(String value) {
- String v = value;
- if (v != null && v.length() > 55)
- v = v.substring(0, 55)+"...";
- LOG.info(v);
- }
-
- public static void connect(Socket socket,
- SocketAddress endpoint,
- int timeout) throws IOException {
- if (socket == null || endpoint == null || timeout < 0) {
- throw new IllegalArgumentException("Illegal argument for connect()");
- }
- SocketChannel ch = socket.getChannel();
- if (ch == null) {
- // let the default implementation handle it.
- socket.connect(endpoint, timeout);
- } else {
- connect(ch, endpoint, timeout);
+ public void setErrorHandler(HBaseRPCErrorHandler handler) {
+ this.handler = handler;
}
}
- private static SelectorPool selector = new SelectorPool();
-
- public static void connect(SocketChannel channel, SocketAddress endpoint,
- int timeout) throws IOException {
-
- boolean blockingOn = channel.isBlocking();
- if (blockingOn) {
- channel.configureBlocking(false);
- }
-
- try {
- if (channel.connect(endpoint)) {
- return;
- }
-
- long timeoutLeft = timeout;
- long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
-
- while (true) {
- // we might have to call finishConnect() more than once
- // for some channels (with user level protocols)
-
- int ret = selector.select((SelectableChannel)channel,
- SelectionKey.OP_CONNECT, timeoutLeft);
-
- if (ret > 0 && channel.finishConnect()) {
- return;
- }
-
- if (ret == 0 ||
- (timeout > 0 &&
- (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
- throw new SocketTimeoutException(
- timeoutExceptionString(channel, timeout,
- SelectionKey.OP_CONNECT));
- }
- }
- } catch (IOException e) {
- // javadoc for SocketChannel.connect() says channel should be closed.
- try {
- channel.close();
- } catch (IOException ignored) {}
- throw e;
- } finally {
- if (blockingOn && channel.isOpen()) {
- channel.configureBlocking(true);
- }
- }
+ private static void log(String value) {
+ if (value!= null && value.length() > 55)
+ value = value.substring(0, 55)+"...";
+ LOG.info(value);
}
-
- private static String timeoutExceptionString(SelectableChannel channel,
- long timeout, int ops) {
-
- String waitingFor;
- switch(ops) {
-
- case SelectionKey.OP_READ :
- waitingFor = "read"; break;
-
- case SelectionKey.OP_WRITE :
- waitingFor = "write"; break;
-
- case SelectionKey.OP_CONNECT :
- waitingFor = "connect"; break;
-
- default :
- waitingFor = "" + ops;
- }
-
- return timeout + " millis timeout while " +
- "waiting for channel to be ready for " +
- waitingFor + ". ch : " + channel;
- }
-
- /**
- * This maintains a pool of selectors. These selectors are closed
- * once they are idle (unused) for a few seconds.
- */
- private static class SelectorPool {
-
- private static class SelectorInfo {
- Selector selector;
- long lastActivityTime;
- LinkedList<SelectorInfo> queue;
-
- void close() {
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException e) {
- LOG.warn("Unexpected exception while closing selector : " +
- StringUtils.stringifyException(e));
- }
- }
- }
- }
-
- private static class ProviderInfo {
- SelectorProvider provider;
- LinkedList<SelectorInfo> queue; // lifo
- ProviderInfo next;
- }
-
- private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
-
- private ProviderInfo providerList = null;
-
- /**
- * Waits on the channel with the given timeout using one of the
- * cached selectors. It also removes any cached selectors that are
- * idle for a few seconds.
- *
- * @param channel
- * @param ops
- * @param timeout
- * @return
- * @throws IOException
- */
- int select(SelectableChannel channel, int ops, long timeout)
- throws IOException {
-
- SelectorInfo info = get(channel);
-
- SelectionKey key = null;
- int ret = 0;
-
- try {
- while (true) {
- long start = (timeout == 0) ? 0 : System.currentTimeMillis();
-
- key = channel.register(info.selector, ops);
- ret = info.selector.select(timeout);
-
- if (ret != 0) {
- return ret;
- }
-
- /* Sometimes select() returns 0 much before timeout for
- * unknown reasons. So select again if required.
- */
- if (timeout > 0) {
- timeout -= System.currentTimeMillis() - start;
- if (timeout <= 0) {
- return 0;
- }
- }
-
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedIOException("Interruped while waiting for " +
- "IO on channel " + channel +
- ". " + timeout +
- " millis timeout left.");
- }
- }
- } finally {
- if (key != null) {
- key.cancel();
- }
-
- //clear the canceled key.
- try {
- info.selector.selectNow();
- } catch (IOException e) {
- LOG.info("Unexpected Exception while clearing selector : " +
- StringUtils.stringifyException(e));
- // don't put the selector back.
- info.close();
- return ret;
- }
-
- release(info);
- }
- }
-
- /**
- * Takes one selector from end of LRU list of free selectors.
- * If there are no selectors awailable, it creates a new selector.
- * Also invokes trimIdleSelectors().
- *
- * @param channel
- * @return
- * @throws IOException
- */
- private synchronized SelectorInfo get(SelectableChannel channel)
- throws IOException {
- SelectorInfo selInfo = null;
-
- SelectorProvider provider = channel.provider();
-
- // pick the list : rarely there is more than one provider in use.
- ProviderInfo pList = providerList;
- while (pList != null && pList.provider != provider) {
- pList = pList.next;
- }
- if (pList == null) {
- //LOG.info("Creating new ProviderInfo : " + provider.toString());
- pList = new ProviderInfo();
- pList.provider = provider;
- pList.queue = new LinkedList<SelectorInfo>();
- pList.next = providerList;
- providerList = pList;
- }
-
- LinkedList<SelectorInfo> queue = pList.queue;
-
- if (queue.isEmpty()) {
- Selector selector = provider.openSelector();
- selInfo = new SelectorInfo();
- selInfo.selector = selector;
- selInfo.queue = queue;
- } else {
- selInfo = queue.removeLast();
- }
-
- trimIdleSelectors(System.currentTimeMillis());
- return selInfo;
- }
-
- /**
- * puts selector back at the end of LRU list of free selectos.
- * Also invokes trimIdleSelectors().
- *
- * @param info
- */
- private synchronized void release(SelectorInfo info) {
- long now = System.currentTimeMillis();
- trimIdleSelectors(now);
- info.lastActivityTime = now;
- info.queue.addLast(info);
- }
-
- /**
- * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
- * traverse the whole list, just over the one that have crossed
- * the timeout.
- */
- private void trimIdleSelectors(long now) {
- long cutoff = now - IDLE_TIMEOUT;
-
- for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
- if (pList.queue.isEmpty()) {
- continue;
- }
- for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
- SelectorInfo info = it.next();
- if (info.lastActivityTime > cutoff) {
- break;
- }
- it.remove();
- info.close();
- }
- }
- }
- }
-
}
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/master/HMaster.java Fri May 15 07:05:26 2009
@@ -64,10 +64,10 @@
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.HBaseRPC.Server;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
@@ -123,7 +123,7 @@
volatile BlockingQueue<RegionServerOperation> toDoQueue =
new LinkedBlockingQueue<RegionServerOperation>();
- private final HBaseServer server;
+ private final Server server;
private final HServerAddress address;
final ServerConnection connection;
@@ -568,7 +568,7 @@
// use the IP given by the user.
if (serverInfo.getServerAddress().getBindAddress().equals(
DEFAULT_HOST)) {
- String rsAddress = HBaseServer.getRemoteAddress();
+ String rsAddress = Server.getRemoteAddress();
serverInfo.setServerAddress(new HServerAddress(rsAddress,
serverInfo.getServerAddress().getPort()));
}
@@ -585,7 +585,7 @@
protected MapWritable createConfigurationSubset() {
MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
// Get the real address of the HRS.
- String rsAddress = HBaseServer.getRemoteAddress();
+ String rsAddress = Server.getRemoteAddress();
if (rsAddress != null) {
mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
}
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=775039&r1=775038&r2=775039&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri May 15 07:05:26 2009
@@ -89,9 +89,9 @@
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.HBaseRPC.Server;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -168,7 +168,7 @@
// Server to handle client requests. Default access so can be accessed by
// unit tests.
- HBaseServer server;
+ Server server;
// Leases
private Leases leases;
@@ -1296,8 +1296,7 @@
// should retry indefinitely.
master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
- masterAddress.getInetSocketAddress(),
- this.conf, -1, this.rpcTimeout);
+ masterAddress.getInetSocketAddress(), this.conf, -1);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();
Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java?rev=775039&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/ipc/HBaseClient.java Fri May 15 07:05:26 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Subclass of hadoop's Client just so we can make some methods accessible
+ * in {@link org.apache.hadoop.hbase.ipc.HbaseRPC}
+ */
+public class HBaseClient extends Client {
+ /**
+ * @param valueClass
+ * @param conf
+ * @param factory
+ */
+ public HBaseClient(Class valueClass, Configuration conf, SocketFactory factory) {
+ super(valueClass, conf, factory);
+ }
+
+ /**
+ * @param valueClass
+ * @param conf
+ */
+ public HBaseClient(Class<?> valueClass, Configuration conf) {
+ super(valueClass, conf);
+ }
+
+ @Override
+ public void incCount() {
+ super.incCount();
+ }
+
+ @Override
+ public void decCount() {
+ super.decCount();
+ }
+
+ @Override
+ public boolean isZeroReference() {
+ return super.isZeroReference();
+ }
+
+ @Override
+ public SocketFactory getSocketFactory() {
+ return super.getSocketFactory();
+ }
+}
\ No newline at end of file