You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/07/30 09:07:18 UTC
svn commit: r1367009 [1/4] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/monitoring/
main/java/org/apache/hadoop/hbase/protobuf/generated/ main/jav...
Author: stack
Date: Mon Jul 30 07:07:17 2012
New Revision: 1367009
URL: http://svn.apache.org/viewvc?rev=1367009&view=rev
Log:
HBASE-5705 Introduce Protocol Buffer RPC engine
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestRpcServiceProtos.java
hbase/trunk/hbase-server/src/test/protobuf/
hbase/trunk/hbase-server/src/test/protobuf/test.proto
hbase/trunk/hbase-server/src/test/protobuf/test_rpc_service.proto
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/protobuf/RPC.proto
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java?rev=1367009&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java Mon Jul 30 07:07:17 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Cache a client using its socket factory as the hash key.
+ * Enables reuse/sharing of clients on a per SocketFactory basis. A client
+ * establishes certain configuration dependent characteristics like timeouts,
+ * tcp-keepalive (true or false), etc. For more details on the characteristics,
+ * look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)}
+ * Creation of dynamic proxies to protocols creates the clients (and increments
+ * reference count once created), and stopping of the proxies leads to clearing
+ * out references and when the reference drops to zero, the cache mapping is
+ * cleared.
+ */
+class ClientCache {
+ private Map<SocketFactory, HBaseClient> clients =
+ new HashMap<SocketFactory, HBaseClient>();
+
+ protected ClientCache() {}
+
+ /**
+ * Construct & cache an IPC client with the user-provided SocketFactory
+ * if no cached client exists.
+ *
+ * @param conf Configuration
+ * @param factory socket factory
+ * @return an IPC client
+ */
+ protected synchronized HBaseClient getClient(Configuration conf,
+ SocketFactory factory) {
+ return getClient(conf, factory, HbaseObjectWritable.class);
+ }
+ protected synchronized HBaseClient getClient(Configuration conf,
+ SocketFactory factory, Class<? extends Writable> valueClass) {
+ HBaseClient client = clients.get(factory);
+ if (client == null) {
+ // Make an hbase client instead of hadoop Client.
+ client = new HBaseClient(valueClass, conf, factory);
+ clients.put(factory, client);
+ } else {
+ client.incCount();
+ }
+ return client;
+ }
+
+ /**
+ * Stop a RPC client connection
+ * A RPC client is closed only when its reference count becomes zero.
+ * @param client client to stop
+ */
+ protected void stopClient(HBaseClient client) {
+ synchronized (this) {
+ client.decCount();
+ if (client.isZeroReference()) {
+ clients.remove(client.getSocketFactory());
+ }
+ }
+ if (client.isZeroReference()) {
+ client.stop();
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Mon Jul 30 07:07:17 2012
@@ -53,12 +53,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
@@ -82,7 +82,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.util.ReflectionUtils;
-import com.google.protobuf.ByteString;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -826,17 +825,15 @@ public class HBaseClient {
try {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
- RpcRequest.Builder builder = RPCProtos.RpcRequest.newBuilder();
+ RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder();
builder.setCallId(call.id);
- Invocation invocation = (Invocation)call.param;
DataOutputBuffer d = new DataOutputBuffer();
- invocation.write(d);
- builder.setRequest(ByteString.copyFrom(d.getData()));
+ builder.build().writeDelimitedTo(d);
+ call.param.write(d);
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- RpcRequest obj = builder.build();
- this.out.writeInt(obj.getSerializedSize());
- obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out));
+ this.out.writeInt(d.getLength());
+ this.out.write(d.getData(), 0, d.getLength());
this.out.flush();
}
} catch(IOException e) {
@@ -859,7 +856,7 @@ public class HBaseClient {
// so the exception name/trace), and the response bytes
// Read the call id.
- RpcResponse response = RpcResponse.parseDelimitedFrom(in);
+ RpcResponseHeader response = RpcResponseHeader.parseDelimitedFrom(in);
if (response == null) {
// When the stream is closed, protobuf doesn't raise an EOFException,
// instead, it returns a null message object.
@@ -873,11 +870,8 @@ public class HBaseClient {
Status status = response.getStatus();
if (status == Status.SUCCESS) {
- ByteString responseObj = response.getResponse();
- DataInputStream dis =
- new DataInputStream(responseObj.newInput());
Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(dis); // read value
+ value.readFields(in); // read value
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (call != null) {
@@ -885,18 +879,20 @@ public class HBaseClient {
}
calls.remove(id);
} else if (status == Status.ERROR) {
+ RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
if (call != null) {
//noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException(
- response.getException().getExceptionName(),
- response.getException().getStackTrace()));
+ exceptionResponse.getExceptionName(),
+ exceptionResponse.getStackTrace()));
calls.remove(id);
}
} else if (status == Status.FATAL) {
+ RpcException exceptionResponse = RpcException.parseDelimitedFrom(in);
// Close the connection
markClosed(new RemoteException(
- response.getException().getExceptionName(),
- response.getException().getStackTrace()));
+ exceptionResponse.getExceptionName(),
+ exceptionResponse.getStackTrace()));
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Mon Jul 30 07:07:17 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.util.Reflection
import javax.net.SocketFactory;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@@ -101,6 +102,13 @@ public class HBaseRPC {
}
};
+ static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field versionField = protocol.getField("VERSION");
+ versionField.setAccessible(true);
+ return versionField.getLong(protocol);
+ }
+
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
@@ -333,16 +341,21 @@ public class HBaseRPC {
long clientVersion, InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
- VersionedProtocol proxy =
- getProtocolEngine(protocol,conf)
- .getProxy(protocol, clientVersion, addr, ticket, conf, factory, Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- }
- throw new VersionMismatch(protocol.getName(), clientVersion,
+ RpcEngine engine = getProtocolEngine(protocol,conf);
+ VersionedProtocol proxy = engine
+ .getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+ Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
+ if (engine instanceof WritableRpcEngine) {
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ clientVersion);
+ if (serverVersion == clientVersion) {
+ return proxy;
+ }
+
+ throw new VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
+ }
+ return proxy;
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Mon Jul 30 07:07:17 2012
@@ -73,11 +73,11 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse.Status;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
@@ -381,23 +381,21 @@ public abstract class HBaseServer implem
}
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
+ DataOutputStream out = new DataOutputStream(buf);
try {
- RpcResponse.Builder builder = RpcResponse.newBuilder();
+ RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
// Call id.
builder.setCallId(this.id);
builder.setStatus(status);
+ builder.build().writeDelimitedTo(out);
if (error != null) {
RpcException.Builder b = RpcException.newBuilder();
b.setExceptionName(errorClass);
b.setStackTrace(error);
- builder.setException(b.build());
+ b.build().writeDelimitedTo(out);
} else {
- DataOutputBuffer d = new DataOutputBuffer(size);
- result.write(d);
- byte[] response = d.getData();
- builder.setResponse(ByteString.copyFrom(response));
+ result.write(out);
}
- builder.build().writeDelimitedTo(buf);
if (connection.useWrap) {
wrapWithSasl(buf);
}
@@ -1616,9 +1614,10 @@ public abstract class HBaseServer implem
}
protected void processData(byte[] buf) throws IOException, InterruptedException {
- RpcRequest request = RpcRequest.parseFrom(buf);
+ DataInputStream dis =
+ new DataInputStream(new ByteArrayInputStream(buf));
+ RpcRequestHeader request = RpcRequestHeader.parseDelimitedFrom(dis);
int id = request.getCallId();
- ByteString clientRequest = request.getRequest();
long callSize = buf.length;
if (LOG.isDebugEnabled()) {
@@ -1639,8 +1638,6 @@ public abstract class HBaseServer implem
Writable param;
try {
- DataInputStream dis =
- new DataInputStream(clientRequest.newInput());
param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis);
} catch (Throwable t) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Mon Jul 30 07:07:17 2012
@@ -57,7 +57,7 @@ public class Invocation extends Versione
// For generated protocol classes which don't have VERSION field,
// such as protobuf interfaces.
- private static final Map<Class<?>, Long>
+ static final Map<Class<?>, Long>
PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
static {
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java?rev=1367009&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java Mon Jul 30 07:07:17 2012
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.DataOutputOutputStream;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.hbase.util.Objects;
+import org.apache.hadoop.hbase.util.ProtoUtil;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+/**
+ * The {@link RpcEngine} implementation for ProtoBuf-based RPCs.
+ */
+@InterfaceAudience.Private
+class ProtobufRpcEngine implements RpcEngine {
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine");
+ protected final static ClientCache CLIENTS = new ClientCache();
+ @Override
+ public VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, User ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+ rpcTimeout);
+ return (VersionedProtocol)Proxy.newProxyInstance(
+ protocol.getClassLoader(), new Class[]{protocol}, invoker);
+ }
+
+ @Override
+ public void stopProxy(VersionedProtocol proxy) {
+ if (proxy!=null) {
+ ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+ }
+ }
+
+ @Override
+ public Server getServer(Class<? extends VersionedProtocol> protocol,
+ Object instance, Class<?>[] ifaces, String bindAddress, int port,
+ int numHandlers, int metaHandlerCount, boolean verbose,
+ Configuration conf, int highPriorityLevel) throws IOException {
+ return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
+ metaHandlerCount, verbose, highPriorityLevel);
+ }
+ private static class Invoker implements InvocationHandler {
+ private final Map<String, Message> returnTypes =
+ new ConcurrentHashMap<String, Message>();
+ private Class<? extends VersionedProtocol> protocol;
+ private InetSocketAddress address;
+ private User ticket;
+ private HBaseClient client;
+ private boolean isClosed = false;
+ final private int rpcTimeout;
+ private final long clientProtocolVersion;
+
+ public Invoker(Class<? extends VersionedProtocol> protocol,
+ InetSocketAddress addr, User ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ this.protocol = protocol;
+ this.address = addr;
+ this.ticket = ticket;
+ this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
+ this.rpcTimeout = rpcTimeout;
+ Long version = Invocation.PROTOCOL_VERSION.get(protocol);
+ if (version != null) {
+ this.clientProtocolVersion = version;
+ } else {
+ try {
+ this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Exception encountered during " +
+ protocol, e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Exception encountered during " +
+ protocol, e);
+ }
+ }
+ }
+
+ private RpcRequestBody constructRpcRequest(Method method,
+ Object[] params) throws ServiceException {
+ RpcRequestBody rpcRequest;
+ RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
+ builder.setMethodName(method.getName());
+ Message param;
+ int length = params.length;
+ if (length == 2) {
+ // RpcController + Message in the method args
+ // (generated code from RPC bits in .proto files have RpcController)
+ param = (Message)params[1];
+ } else if (length == 1) { // Message
+ param = (Message)params[0];
+ } else {
+ throw new ServiceException("Too many parameters for request. Method: ["
+ + method.getName() + "]" + ", Expected: 2, Actual: "
+ + params.length);
+ }
+ builder.setRequest(param.toByteString());
+ builder.setClientProtocolVersion(clientProtocolVersion);
+ rpcRequest = builder.build();
+ return rpcRequest;
+ }
+
+ /**
+ * This is the client side invoker of RPC method. It only throws
+ * ServiceException, since the invocation proxy expects only
+ * ServiceException to be thrown by the method in case protobuf service.
+ *
+ * ServiceException has the following causes:
+ * <ol>
+ * <li>Exceptions encountered on the client side in this method are
+ * set as cause in ServiceException as is.</li>
+ * <li>Exceptions from the server are wrapped in RemoteException and are
+ * set as cause in ServiceException</li>
+ * </ol>
+ *
+ * Note that the client calling protobuf RPC methods, must handle
+ * ServiceException by getting the cause from the ServiceException. If the
+ * cause is RemoteException, then unwrap it to get the exception thrown by
+ * the server.
+ */
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws ServiceException {
+ long startTime = 0;
+ if (LOG.isDebugEnabled()) {
+ startTime = System.currentTimeMillis();
+ }
+
+ RpcRequestBody rpcRequest = constructRpcRequest(method, args);
+ RpcResponseWritable val = null;
+ try {
+ val = (RpcResponseWritable) client.call(
+ new RpcRequestWritable(rpcRequest), address, protocol, ticket,
+ rpcTimeout);
+
+ if (LOG.isDebugEnabled()) {
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+
+ Message protoType = null;
+ protoType = getReturnProtoType(method);
+ Message returnMessage;
+ returnMessage = protoType.newBuilderForType()
+ .mergeFrom(val.responseMessage).build();
+ return returnMessage;
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ synchronized protected void close() {
+ if (!isClosed) {
+ isClosed = true;
+ CLIENTS.stopClient(client);
+ }
+ }
+
+ private Message getReturnProtoType(Method method) throws Exception {
+ if (returnTypes.containsKey(method.getName())) {
+ return returnTypes.get(method.getName());
+ }
+
+ Class<?> returnType = method.getReturnType();
+ Method newInstMethod = returnType.getMethod("getDefaultInstance");
+ newInstMethod.setAccessible(true);
+ Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
+ returnTypes.put(method.getName(), protoType);
+ return protoType;
+ }
+ }
+
+ /**
+ * Writable Wrapper for Protocol Buffer Requests
+ */
+ private static class RpcRequestWritable implements Writable {
+ RpcRequestBody message;
+
+ @SuppressWarnings("unused")
+ public RpcRequestWritable() {
+ }
+
+ RpcRequestWritable(RpcRequestBody message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ ((Message)message).writeDelimitedTo(
+ DataOutputOutputStream.constructOutputStream(out));
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = ProtoUtil.readRawVarint32(in);
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ message = RpcRequestBody.parseFrom(bytes);
+ }
+
+ public int getSerializedSize() {
+ return message.getSerializedSize();
+ }
+
+ @Override
+ public String toString() {
+ return " Client Protocol Version: " +
+ message.getClientProtocolVersion() + " MethodName: " +
+ message.getMethodName();
+ }
+ }
+
+ /**
+ * Writable Wrapper for Protocol Buffer Responses
+ */
+ private static class RpcResponseWritable implements Writable {
+ byte[] responseMessage;
+
+ @SuppressWarnings("unused")
+ public RpcResponseWritable() {
+ }
+
+ public RpcResponseWritable(Message message) {
+ this.responseMessage = message.toByteArray();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(responseMessage.length);
+ out.write(responseMessage);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ responseMessage = bytes;
+ }
+ }
+ public static class Server extends WritableRpcEngine.Server {
+ boolean verbose;
+ Object instance;
+ Class<?> implementation;
+ private static final String WARN_RESPONSE_TIME =
+ "hbase.ipc.warn.response.time";
+ private static final String WARN_RESPONSE_SIZE =
+ "hbase.ipc.warn.response.size";
+
+ /** Default value for above params */
+ private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
+ private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+ /** Names for suffixed metrics */
+ private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
+
+ private final int warnResponseTime;
+ private final int warnResponseSize;
+ public Server(Object instance, final Class<?>[] ifaces,
+ Configuration conf, String bindAddress, int port,
+ int numHandlers, int metaHandlerCount, boolean verbose,
+ int highPriorityLevel)
+ throws IOException {
+ super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port,
+ numHandlers, metaHandlerCount, verbose, highPriorityLevel);
+ this.verbose = verbose;
+ this.instance = instance;
+ this.implementation = instance.getClass();
+ // create metrics for the advertised interfaces this server implements.
+ String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
+ this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
+
+ this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
+ DEFAULT_WARN_RESPONSE_TIME);
+ this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
+ DEFAULT_WARN_RESPONSE_SIZE);
+ }
+ private final Map<String, Message> methodArg =
+ new ConcurrentHashMap<String, Message>();
+ private final Map<String, Method> methodInstances =
+ new ConcurrentHashMap<String, Method>();
+ @Override
+ /**
+ * This is a server side method, which is invoked over RPC. On success
+ * the return response has protobuf response payload. On failure, the
+ * exception name and the stack trace are returned in the protobuf response.
+ */
+ public Writable call(Class<? extends VersionedProtocol> protocol,
+ Writable writableRequest, long receiveTime, MonitoredRPCHandler status)
+ throws IOException {
+ try {
+ RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+ RpcRequestBody rpcRequest = request.message;
+ String methodName = rpcRequest.getMethodName();
+ Method method = getMethod(protocol, methodName);
+ if (method == null) {
+ throw new HBaseRPC.UnknownProtocolException("Method " + methodName +
+ " doesn't exist in protocol " + protocol.getName());
+ }
+
+ /**
+ * RPCs for a particular interface (ie protocol) are done using a
+ * IPC connection that is setup using rpcProxy.
+ * The rpcProxy's has a declared protocol name that is
+ * sent form client to server at connection time.
+ */
+ //TODO: use the clientVersion to do protocol compatibility checks, and
+ //this could be used here to handle complex use cases like deciding
+ //which implementation of the protocol should be used to service the
+ //current request, etc. Ideally, we shouldn't land up in a situation
+ //where we need to support such a use case.
+ //For now the clientVersion field is simply ignored
+ long clientVersion = rpcRequest.getClientProtocolVersion();
+
+ if (verbose) {
+ LOG.info("Call: protocol name=" + protocol.getName() +
+ ", method=" + methodName);
+ }
+
+ status.setRPC(rpcRequest.getMethodName(),
+ new Object[]{rpcRequest.getRequest()}, receiveTime);
+ status.setRPCPacket(writableRequest);
+ status.resume("Servicing call");
+ //get an instance of the method arg type
+ Message protoType = getMethodArgType(method);
+ Message param = protoType.newBuilderForType()
+ .mergeFrom(rpcRequest.getRequest()).build();
+ Message result;
+ Object impl = null;
+ if (protocol.isAssignableFrom(this.implementation)) {
+ impl = this.instance;
+ } else {
+ throw new HBaseRPC.UnknownProtocolException(protocol);
+ }
+
+ long startTime = System.currentTimeMillis();
+ if (method.getParameterTypes().length == 2) {
+ // RpcController + Message in the method args
+ // (generated code from RPC bits in .proto files have RpcController)
+ result = (Message)method.invoke(impl, null, param);
+ } else if (method.getParameterTypes().length == 1) {
+ // Message (hand written code usually has only a single argument)
+ result = (Message)method.invoke(impl, param);
+ } else {
+ throw new ServiceException("Too many parameters for method: ["
+ + method.getName() + "]" + ", allowed (at most): 2, Actual: "
+ + method.getParameterTypes().length);
+ }
+ int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int qTime = (int) (startTime-receiveTime);
+ if (TRACELOG.isDebugEnabled()) {
+ TRACELOG.debug("Call #" + CurCall.get().id +
+ "; Served: " + protocol.getSimpleName()+"#"+method.getName() +
+ " queueTime=" + qTime +
+ " processingTime=" + processingTime +
+ " contents=" + Objects.describeQuantity(param));
+ }
+ rpcMetrics.rpcQueueTime.inc(qTime);
+ rpcMetrics.rpcProcessingTime.inc(processingTime);
+ rpcMetrics.inc(method.getName(), processingTime);
+ if (verbose) {
+ WritableRpcEngine.log("Return: "+result, LOG);
+ }
+ long responseSize = result.getSerializedSize();
+ // log any RPC responses that are slower than the configured warn
+ // response time or larger than configured warning size
+ boolean tooSlow = (processingTime > warnResponseTime
+ && warnResponseTime > -1);
+ boolean tooLarge = (responseSize > warnResponseSize
+ && warnResponseSize > -1);
+ if (tooSlow || tooLarge) {
+ // when tagging, we let TooLarge trump TooSmall to keep output simple
+ // note that large responses will often also be slow.
+ StringBuilder buffer = new StringBuilder(256);
+ buffer.append(methodName);
+ buffer.append("(");
+ buffer.append(param.getClass().getName());
+ buffer.append(")");
+ buffer.append(", client version="+clientVersion);
+ logResponse(new Object[]{rpcRequest.getRequest()},
+ methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
+ status.getClient(), startTime, processingTime, qTime,
+ responseSize);
+ // provides a count of log-reported slow responses
+ if (tooSlow) {
+ rpcMetrics.rpcSlowResponseTime.inc(processingTime);
+ }
+ }
+ if (processingTime > 1000) {
+ // we use a hard-coded one second period so that we can clearly
+ // indicate the time period we're warning about in the name of the
+ // metric itself
+ rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
+ processingTime);
+ }
+ return new RpcResponseWritable(result);
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ throw (IOException)target;
+ }
+ if (target instanceof ServiceException) {
+ throw ProtobufUtil.getRemoteException((ServiceException)target);
+ }
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ } catch (Throwable e) {
+ if (!(e instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", e);
+ }
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ }
+ }
+
+ private Method getMethod(Class<? extends VersionedProtocol> protocol,
+ String methodName) {
+ Method method = methodInstances.get(methodName);
+ if (method != null) {
+ return method;
+ }
+ Method[] methods = protocol.getMethods();
+ LOG.warn("Methods length : " + methods.length);
+ for (Method m : methods) {
+ if (m.getName().equals(methodName)) {
+ m.setAccessible(true);
+ methodInstances.put(methodName, m);
+ return m;
+ }
+ }
+ return null;
+ }
+
+ private Message getMethodArgType(Method method) throws Exception {
+ Message protoType = methodArg.get(method.getName());
+ if (protoType != null) {
+ return protoType;
+ }
+
+ Class<?>[] args = method.getParameterTypes();
+ Class<?> arg;
+ if (args.length == 2) {
+ // RpcController + Message in the method args
+ // (generated code from RPC bits in .proto files have RpcController)
+ arg = args[1];
+ } else if (args.length == 1) {
+ arg = args[0];
+ } else {
+ //unexpected
+ return null;
+ }
+ //in the protobuf methods, args[1] is the only significant argument
+ Method newInstMethod = arg.getMethod("getDefaultInstance");
+ newInstMethod.setAccessible(true);
+ protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
+ methodArg.put(method.getName(), protoType);
+ return protoType;
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Mon Jul 30 07:07:17 2012
@@ -28,18 +28,14 @@ import java.lang.reflect.UndeclaredThrow
import java.net.InetSocketAddress;
import java.io.*;
-import java.util.HashSet;
import java.util.Map;
import java.util.HashMap;
-import java.util.Set;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.AdminProtocol;
-import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -69,57 +65,6 @@ class WritableRpcEngine implements RpcEn
// DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
- /* Cache a client using its socket factory as the hash key */
- static private class ClientCache {
- private Map<SocketFactory, HBaseClient> clients =
- new HashMap<SocketFactory, HBaseClient>();
-
- protected ClientCache() {}
-
- /**
- * Construct & cache an IPC client with the user-provided SocketFactory
- * if no cached client exists.
- *
- * @param conf Configuration
- * @param factory socket factory
- * @return an IPC client
- */
- protected synchronized HBaseClient getClient(Configuration conf,
- SocketFactory factory) {
- // Construct & cache client. The configuration is only used for timeout,
- // and Clients have connection pools. So we can either (a) lose some
- // connection pooling and leak sockets, or (b) use the same timeout for
- // all configurations. Since the IPC is usually intended globally, not
- // per-job, we choose (a).
- HBaseClient client = clients.get(factory);
- if (client == null) {
- // Make an hbase client instead of hadoop Client.
- client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
- clients.put(factory, client);
- } else {
- client.incCount();
- }
- return client;
- }
-
- /**
- * Stop a RPC client connection
- * A RPC client is closed only when its reference count becomes zero.
- * @param client client to stop
- */
- protected void stopClient(HBaseClient client) {
- synchronized (this) {
- client.decCount();
- if (client.isZeroReference()) {
- clients.remove(client.getSocketFactory());
- }
- }
- if (client.isZeroReference()) {
- client.stop();
- }
- }
- }
-
protected final static ClientCache CLIENTS = new ClientCache();
private static class Invoker implements InvocationHandler {
@@ -150,8 +95,8 @@ class WritableRpcEngine implements RpcEn
try {
HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address,
- protocol, ticket, rpcTimeout);
+ client.call(new Invocation(method, args), address, protocol, ticket,
+ rpcTimeout);
if (logDebug) {
// FIGURE HOW TO TURN THIS OFF!
long callTime = System.currentTimeMillis() - startTime;
@@ -271,18 +216,23 @@ class WritableRpcEngine implements RpcEn
/** Construct an RPC server.
* @param instance the instance whose methods will be called
+ * @param ifaces the interfaces the server supports
+ * @param paramClass an instance of this class is used to read the RPC requests
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
+ * @param metaHandlerCount the number of meta handlers desired
* @param verbose whether each call should be logged
+ * @param highPriorityLevel the priority level this server treats as high priority RPCs
* @throws IOException e
*/
public Server(Object instance, final Class<?>[] ifaces,
+ Class<? extends Writable> paramClass,
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel) throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount,
+ super(bindAddress, port, paramClass, numHandlers, metaHandlerCount,
conf, classNameBase(instance.getClass().getName()),
highPriorityLevel);
this.instance = instance;
@@ -301,6 +251,14 @@ class WritableRpcEngine implements RpcEn
DEFAULT_WARN_RESPONSE_SIZE);
}
+ public Server(Object instance, final Class<?>[] ifaces,
+ Configuration conf, String bindAddress, int port,
+ int numHandlers, int metaHandlerCount, boolean verbose,
+ int highPriorityLevel) throws IOException {
+ this(instance, ifaces, Invocation.class, conf, bindAddress, port,
+ numHandlers, metaHandlerCount, verbose, highPriorityLevel);
+ }
+
public AuthenticationTokenSecretManager createSecretManager(){
if (!User.isSecurityEnabled() ||
!(instance instanceof org.apache.hadoop.hbase.Server)) {
@@ -341,7 +299,7 @@ class WritableRpcEngine implements RpcEn
throw new IOException("Could not find requested method, the usual " +
"cause is a version mismatch between client and server.");
}
- if (verbose) log("Call: " + call);
+ if (verbose) log("Call: " + call, LOG);
status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
status.setRPCPacket(param);
status.resume("Servicing call");
@@ -389,7 +347,7 @@ class WritableRpcEngine implements RpcEn
rpcMetrics.rpcQueueTime.inc(qTime);
rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(call.getMethodName(), processingTime);
- if (verbose) log("Return: "+value);
+ if (verbose) log("Return: "+value, LOG);
HbaseObjectWritable retVal =
new HbaseObjectWritable(method.getReturnType(), value);
@@ -403,7 +361,8 @@ class WritableRpcEngine implements RpcEn
if (tooSlow || tooLarge) {
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
- logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
+ logResponse(call.getParameters(), call.getMethodName(),
+ call.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime,
responseSize);
// provides a count of log-reported slow responses
@@ -444,7 +403,9 @@ class WritableRpcEngine implements RpcEn
/**
* Logs an RPC response to the LOG file, producing valid JSON objects for
* client Operations.
- * @param call The call to log.
+ * @param params The parameters received in the call.
+ * @param methodName The name of the method invoked
+ * @param call The string representation of the call
* @param tag The tag that will be used to indicate this event in the log.
* @param client The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms.
@@ -453,10 +414,10 @@ class WritableRpcEngine implements RpcEn
* prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer.
*/
- private void logResponse(Invocation call, String tag, String clientAddress,
- long startTime, int processingTime, int qTime, long responseSize)
+ void logResponse(Object[] params, String methodName, String call, String tag,
+ String clientAddress, long startTime, int processingTime, int qTime,
+ long responseSize)
throws IOException {
- Object params[] = call.getParameters();
// for JSON encoding
ObjectMapper mapper = new ObjectMapper();
// base information that is reported regardless of type of call
@@ -467,7 +428,7 @@ class WritableRpcEngine implements RpcEn
responseInfo.put("responsesize", responseSize);
responseInfo.put("client", clientAddress);
responseInfo.put("class", instance.getClass().getSimpleName());
- responseInfo.put("method", call.getMethodName());
+ responseInfo.put("method", methodName);
if (params.length == 2 && instance instanceof HRegionServer &&
params[0] instanceof byte[] &&
params[1] instanceof Operation) {
@@ -491,14 +452,14 @@ class WritableRpcEngine implements RpcEn
} else {
// can't get JSON details, so just report call.toString() along with
// a more generic tag.
- responseInfo.put("call", call.toString());
+ responseInfo.put("call", call);
LOG.warn("(response" + tag + "): " +
mapper.writeValueAsString(responseInfo));
}
}
}
- protected static void log(String value) {
+ protected static void log(String value, Log LOG) {
String v = value;
if (v != null && v.length() > 55)
v = v.substring(0, 55)+"...";
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Jul 30 07:07:17 2012
@@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.executor.
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@@ -99,8 +101,6 @@ import org.apache.hadoop.hbase.master.me
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -331,7 +331,7 @@ Server {
}
int numHandlers = conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count", 25));
- this.rpcServer = HBaseRPC.getServer(this,
+ this.rpcServer = HBaseRPC.getServer(MasterMonitorProtocol.class, this,
new Class<?>[]{MasterMonitorProtocol.class,
MasterAdminProtocol.class, RegionServerStatusProtocol.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1367009&r1=1367008&r2=1367009&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Mon Jul 30 07:07:17 2012
@@ -261,5 +261,4 @@ public class MonitoredRPCHandlerImpl ext
}
return super.toString() + ", rpcMethod=" + getRPC();
}
-
}