You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/08/19 23:47:22 UTC
svn commit: r1374860 [1/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/monitoring/
main/java/org/apache/hadoop/hbase/protobuf/generated/
main/java/org/apache/hadoop/hbase/regionserver/ ma...
Author: stack
Date: Sun Aug 19 21:47:21 2012
New Revision: 1374860
URL: http://svn.apache.org/viewvc?rev=1374860&view=rev
Log:
HBASE-6414 Remove the WritableRpcEngine & associated Invocation classes
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
hbase/trunk/hbase-server/src/test/protobuf/test_delayed_rpc.proto
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/protobuf/RPC.proto
hbase/trunk/hbase-server/src/main/resources/hbase-default.xml
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java Sun Aug 19 21:47:21 2012
@@ -54,14 +54,10 @@ class ClientCache {
*/
protected synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory) {
- return getClient(conf, factory, HbaseObjectWritable.class);
- }
- protected synchronized HBaseClient getClient(Configuration conf,
- SocketFactory factory, Class<? extends Writable> valueClass) {
HBaseClient client = clients.get(factory);
if (client == null) {
// Make an hbase client instead of hadoop Client.
- client = new HBaseClient(valueClass, conf, factory);
+ client = new HBaseClient(conf, factory);
clients.put(factory, client);
} else {
client.incCount();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Sun Aug 19 21:47:21 2012
@@ -55,6 +55,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@@ -69,10 +71,8 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -80,11 +80,14 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.util.ReflectionUtils;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
-/** A client for an IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
+
+/** A client for an IPC service. IPC calls take a single Protobuf message as a
+ * parameter, and return a single Protobuf message as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
* <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
@@ -99,7 +102,6 @@ public class HBaseClient {
.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;
- protected final Class<? extends Writable> valueClass; // class of call values
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
@@ -187,13 +189,13 @@ public class HBaseClient {
/** A call waiting for a value. */
protected class Call {
final int id; // call id
- final Writable param; // parameter
- Writable value; // value, null if error
+ final RpcRequestBody param; // rpc request object
+ Message value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
- protected Call(Writable param) {
+ protected Call(RpcRequestBody param) {
this.param = param;
this.startTime = System.currentTimeMillis();
synchronized (HBaseClient.this) {
@@ -223,7 +225,7 @@ public class HBaseClient {
*
* @param value return value of the call.
*/
- public synchronized void setValue(Writable value) {
+ public synchronized void setValue(Message value) {
this.value = value;
callComplete();
}
@@ -825,15 +827,19 @@ public class HBaseClient {
try {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
- RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder();
- builder.setCallId(call.id);
- DataOutputBuffer d = new DataOutputBuffer();
- builder.build().writeDelimitedTo(d);
- call.param.write(d);
+ RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder();
+ headerBuilder.setCallId(call.id);
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- this.out.writeInt(d.getLength());
- this.out.write(d.getData(), 0, d.getLength());
+ RpcRequestHeader header = headerBuilder.build();
+ int serializedHeaderSize = header.getSerializedSize();
+ int requestSerializedSize = call.param.getSerializedSize();
+ this.out.writeInt(serializedHeaderSize +
+ CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) +
+ requestSerializedSize +
+ CodedOutputStream.computeRawVarint32Size(requestSerializedSize));
+ header.writeDelimitedTo(this.out);
+ call.param.writeDelimitedTo(this.out);
this.out.flush();
}
} catch(IOException e) {
@@ -870,8 +876,17 @@ public class HBaseClient {
Status status = response.getStatus();
if (status == Status.SUCCESS) {
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // read value
+ Message rpcResponseType;
+ try {
+ rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
+ ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
+ call.param.getMethodName()));
+ } catch (Exception e) {
+ throw new RuntimeException(e); //local exception
+ }
+ Builder builder = rpcResponseType.newBuilderForType();
+ builder.mergeDelimitedFrom(in);
+ Message value = builder.build();
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (call != null) {
@@ -983,7 +998,7 @@ public class HBaseClient {
private final ParallelResults results;
protected final int index;
- public ParallelCall(Writable param, ParallelResults results, int index) {
+ public ParallelCall(RpcRequestBody param, ParallelResults results, int index) {
super(param);
this.results = results;
this.index = index;
@@ -998,12 +1013,12 @@ public class HBaseClient {
/** Result collector for parallel calls. */
protected static class ParallelResults {
- protected final Writable[] values;
+ protected final Message[] values;
protected int size;
protected int count;
public ParallelResults(int size) {
- this.values = new Writable[size];
+ this.values = new RpcResponseBody[size];
this.size = size;
}
@@ -1020,15 +1035,13 @@ public class HBaseClient {
}
/**
- * Construct an IPC client whose values are of the given {@link Writable}
+ * Construct an IPC client whose values are of the {@link Message}
* class.
* @param valueClass value class
* @param conf configuration
* @param factory socket factory
*/
- public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
- SocketFactory factory) {
- this.valueClass = valueClass;
+ public HBaseClient(Configuration conf, SocketFactory factory) {
this.maxIdleTime =
conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
@@ -1051,8 +1064,8 @@ public class HBaseClient {
* @param valueClass value class
* @param conf configuration
*/
- public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
- this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
+ public HBaseClient(Configuration conf) {
+ this(conf, NetUtils.getDefaultSocketFactory(conf));
}
/**
@@ -1124,17 +1137,17 @@ public class HBaseClient {
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
- * @param param writable parameter
+ * @param param RpcRequestBody parameter
* @param address network address
- * @return Writable
+ * @return Message
* @throws IOException e
*/
- public Writable call(Writable param, InetSocketAddress address)
+ public Message call(RpcRequestBody param, InetSocketAddress address)
throws IOException, InterruptedException {
return call(param, address, null, 0);
}
- public Writable call(Writable param, InetSocketAddress addr,
+ public Message call(RpcRequestBody param, InetSocketAddress addr,
User ticket, int rpcTimeout)
throws IOException, InterruptedException {
return call(param, addr, null, ticket, rpcTimeout);
@@ -1145,7 +1158,7 @@ public class HBaseClient {
* with the <code>ticket</code> credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
- public Writable call(Writable param, InetSocketAddress addr,
+ public Message call(RpcRequestBody param, InetSocketAddress addr,
Class<? extends VersionedProtocol> protocol,
User ticket, int rpcTimeout)
throws InterruptedException, IOException {
@@ -1217,14 +1230,14 @@ public class HBaseClient {
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored.
- * @param params writable parameters
+ * @param params RpcRequestBody parameters
* @param addresses socket addresses
- * @return Writable[]
+ * @return RpcResponseBody[]
* @throws IOException e
- * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead
+ * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
*/
@Deprecated
- public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+ public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
throws IOException, InterruptedException {
return call(params, addresses, null, null);
}
@@ -1233,11 +1246,11 @@ public class HBaseClient {
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
- public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+ public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
Class<? extends VersionedProtocol> protocol,
User ticket)
throws IOException, InterruptedException {
- if (addresses.length == 0) return new Writable[0];
+ if (addresses.length == 0) return new RpcResponseBody[0];
ParallelResults results = new ParallelResults(params.length);
// TODO this synchronization block doesnt make any sense, we should possibly fix it
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Sun Aug 19 21:47:21 2012
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.DoNotRetr
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
@@ -46,24 +45,13 @@ import java.util.Map;
*
* This is a local hbase copy of the hadoop RPC so we can do things like
* address HADOOP-414 for hbase-only and try other hbase-specific
- * optimizations like using our own version of ObjectWritable. Class has been
- * renamed to avoid confusing it w/ hadoop versions.
+ * optimizations. Class has been renamed to avoid confusing it w/ hadoop
+ * versions.
* <p>
*
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
- * be one of:
- *
- * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
- * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
- * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
- *
- * <li>a {@link String}; or</li>
- *
- * <li>a {@link Writable}; or</li>
- *
- * <li>an array of the above types</li> </ul>
- *
+ * be Protobuf objects.
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
@@ -122,7 +110,7 @@ public class HBaseRPC {
if (engine == null) {
// check for a configured default engine
Class<?> defaultEngine =
- conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);
+ conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class);
// check for a per interface override
Class<?> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
@@ -345,16 +333,6 @@ public class HBaseRPC {
VersionedProtocol proxy = engine
.getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
- if (engine instanceof WritableRpcEngine) {
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- }
-
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
return proxy;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sun Aug 19 21:47:21 2012
@@ -68,12 +68,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
@@ -87,9 +85,7 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -104,17 +100,16 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
import org.cliffc.high_scale_lib.Counter;
-/** An abstract IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
+/** A client for an IPC service. IPC calls take a single Protobuf message as a
+ * parameter, and return a single Protobuf message as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
*
@@ -193,8 +188,8 @@ public abstract class HBaseServer implem
}
/** Returns the server instance called under or null. May be called under
- * {@link #call(Class, Writable, long, MonitoredRPCHandler)} implementations,
- * and under {@link Writable} methods of paramters and return values.
+ * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+ * and under protobuf methods of paramters and return values.
* Permits applications to access the server context.
* @return HBaseServer
*/
@@ -235,7 +230,6 @@ public abstract class HBaseServer implem
private int handlerCount; // number of handler threads
private int priorityHandlerCount;
private int readThreads; // number of read threads
- protected Class<? extends Writable> paramClass; // class of call parameters
protected int maxIdleTime; // the maximum idle time after
// which a client may be
// disconnected
@@ -312,7 +306,7 @@ public abstract class HBaseServer implem
/** A call queued for handling. */
protected class Call implements RpcCallContext {
protected int id; // the client's call id
- protected Writable param; // the parameter passed
+ protected RpcRequestBody rpcRequestBody; // the parameter passed
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
@@ -324,10 +318,10 @@ public abstract class HBaseServer implem
protected long size; // size of current call
protected boolean isError;
- public Call(int id, Writable param, Connection connection,
+ public Call(int id, RpcRequestBody rpcRequestBody, Connection connection,
Responder responder, long size) {
this.id = id;
- this.param = param;
+ this.rpcRequestBody = rpcRequestBody;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
@@ -339,7 +333,7 @@ public abstract class HBaseServer implem
@Override
public String toString() {
- return param.toString() + " from " + connection.toString();
+ return rpcRequestBody.toString() + " from " + connection.toString();
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
@@ -353,34 +347,13 @@ public abstract class HBaseServer implem
if (errorClass != null) {
this.isError = true;
}
- Writable result = null;
- if (value instanceof Writable) {
- result = (Writable) value;
+
+ ByteBufferOutputStream buf = null;
+ if (value != null) {
+ buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
} else {
- /* We might have a null value and errors. Avoid creating a
- * HbaseObjectWritable, because the constructor fails on null. */
- if (value != null) {
- result = new HbaseObjectWritable(value);
- }
- }
-
- int size = BUFFER_INITIAL_SIZE;
- if (result instanceof WritableWithSize) {
- // get the size hint.
- WritableWithSize ohint = (WritableWithSize) result;
- long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT;
- if (hint > Integer.MAX_VALUE) {
- // oops, new problem.
- IOException ioe =
- new IOException("Result buffer size too large: " + hint);
- errorClass = ioe.getClass().getName();
- error = StringUtils.stringifyException(ioe);
- } else {
- size = (int)hint;
- }
+ buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE);
}
-
- ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
@@ -394,7 +367,9 @@ public abstract class HBaseServer implem
b.setStackTrace(error);
b.build().writeDelimitedTo(out);
} else {
- result.write(out);
+ if (value != null) {
+ ((Message)value).writeDelimitedTo(out);
+ }
}
if (connection.useWrap) {
wrapWithSasl(buf);
@@ -709,7 +684,7 @@ public abstract class HBaseServer implem
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ignored) {}
- }
+ }
} catch (Exception e) {
closeCurrentConnection(key, e);
}
@@ -1418,7 +1393,7 @@ public abstract class HBaseServer implem
AccessControlException ae = new AccessControlException(
"Authentication is required");
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
- null, ae.getClass().getName(), ae.getMessage());
+ ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
throw ae;
}
@@ -1506,7 +1481,7 @@ public abstract class HBaseServer implem
// Versions 3 and greater can interpret this exception
// response in the same manner
setupResponse(buffer, fakeCall, Status.FATAL,
- null, VersionMismatch.class.getName(), errMsg);
+ VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
}
@@ -1623,23 +1598,21 @@ public abstract class HBaseServer implem
if (LOG.isDebugEnabled()) {
LOG.debug(" got call #" + id + ", " + callSize + " bytes");
}
-
// Enforcing the call queue size, this triggers a retry in the client
if ((callSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, null, this, responder, callSize);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
+ setupResponse(responseBuffer, callTooBig, Status.FATAL,
IOException.class.getName(),
"Call queue is full, is ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
}
- Writable param;
+ RpcRequestBody rpcRequestBody;
try {
- param = ReflectionUtils.newInstance(paramClass, conf);//read param
- param.readFields(dis);
+ rpcRequestBody = RpcRequestBody.parseDelimitedFrom(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
getHostAddress(), t);
@@ -1647,16 +1620,16 @@ public abstract class HBaseServer implem
new Call(id, null, this, responder, callSize);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+ setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL,
t.getClass().getName(),
"IPC server unable to read call parameters: " + t.getMessage());
responder.doRespond(readParamsFailedCall);
return;
}
- Call call = new Call(id, param, this, responder, callSize);
+ Call call = new Call(id, rpcRequestBody, this, responder, callSize);
callQueueSize.add(callSize);
- if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
+ if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) {
priorityCallQueue.put(call);
updateCallQueueLenMetrics(priorityCallQueue);
} else {
@@ -1683,7 +1656,7 @@ public abstract class HBaseServer implem
} catch (AuthorizationException ae) {
LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
rpcMetrics.authorizationFailures.inc();
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
@@ -1785,7 +1758,7 @@ public abstract class HBaseServer implem
String errorClass = null;
String error = null;
- Writable value = null;
+ Message value = null;
CurCall.set(call);
try {
@@ -1802,7 +1775,7 @@ public abstract class HBaseServer implem
RequestContext.set(User.create(call.connection.user), getRemoteIp(),
call.connection.protocol);
// make the call
- value = call(call.connection.protocol, call.param, call.timestamp,
+ value = call(call.connection.protocol, call.rpcRequestBody, call.timestamp,
status);
} catch (Throwable e) {
LOG.debug(getName()+", call "+call+": error: " + e, e);
@@ -1855,7 +1828,7 @@ public abstract class HBaseServer implem
}
- private Function<Writable,Integer> qosFunction = null;
+ private Function<RpcRequestBody,Integer> qosFunction = null;
/**
* Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
@@ -1864,16 +1837,16 @@ public abstract class HBaseServer implem
* @param newFunc
*/
@Override
- public void setQosFunction(Function<Writable, Integer> newFunc) {
+ public void setQosFunction(Function<RpcRequestBody, Integer> newFunc) {
qosFunction = newFunc;
}
- protected int getQosLevel(Writable param) {
+ protected int getQosLevel(RpcRequestBody rpcRequestBody) {
if (qosFunction == null) {
return 0;
}
- Integer res = qosFunction.apply(param);
+ Integer res = qosFunction.apply(rpcRequestBody);
if (res == null) {
return 0;
}
@@ -1886,14 +1859,13 @@ public abstract class HBaseServer implem
*
*/
protected HBaseServer(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount,
+ int handlerCount,
int priorityHandlerCount, Configuration conf, String serverName,
int highPriorityLevel)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
- this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.socketSendBufferSize = 0;
@@ -1963,26 +1935,10 @@ public abstract class HBaseServer implem
*/
private void setupResponse(ByteArrayOutputStream response,
Call call, Status status,
- Writable rv, String errorClass, String error)
+ String errorClass, String error)
throws IOException {
response.reset();
- DataOutputStream out = new DataOutputStream(response);
-
- if (status == Status.SUCCESS) {
- try {
- rv.write(out);
- call.setResponse(rv, status, null, null);
- } catch (Throwable t) {
- LOG.warn("Error serializing call response for call " + call, t);
- // Call back to same function - this is OK since the
- // buffer is reset at the top, and since status is changed
- // to ERROR it won't infinite loop.
- call.setResponse(null, status.ERROR, t.getClass().getName(),
- StringUtils.stringifyException(t));
- }
- } else {
- call.setResponse(rv, status, errorClass, error);
- }
+ call.setResponse(null, status, errorClass, error);
}
protected void closeConnection(Connection connection) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java Sun Aug 19 21:47:21 2012
@@ -18,14 +18,13 @@
package org.apache.hadoop.hbase.ipc;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,14 +34,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Objects;
-import org.apache.hadoop.hbase.util.ProtoUtil;
+import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
@@ -80,8 +85,9 @@ class ProtobufRpcEngine implements RpcEn
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
metaHandlerCount, verbose, highPriorityLevel);
}
- private static class Invoker implements InvocationHandler {
- private final Map<String, Message> returnTypes =
+
+ static class Invoker implements InvocationHandler {
+ private static final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private Class<? extends VersionedProtocol> protocol;
private InetSocketAddress address;
@@ -97,7 +103,7 @@ class ProtobufRpcEngine implements RpcEn
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
+ this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
Long version = Invocation.PROTOCOL_VERSION.get(protocol);
if (version != null) {
@@ -133,6 +139,7 @@ class ProtobufRpcEngine implements RpcEn
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
+ builder.setRequestClassName(param.getClass().getName());
builder.setRequest(param.toByteString());
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
@@ -166,24 +173,20 @@ class ProtobufRpcEngine implements RpcEn
}
RpcRequestBody rpcRequest = constructRpcRequest(method, args);
- RpcResponseWritable val = null;
+ Message val = null;
try {
- val = (RpcResponseWritable) client.call(
- new RpcRequestWritable(rpcRequest), address, protocol, ticket,
- rpcTimeout);
+ val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
+ if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
}
-
- Message protoType = null;
- protoType = getReturnProtoType(method);
- Message returnMessage;
- returnMessage = protoType.newBuilderForType()
- .mergeFrom(val.responseMessage).build();
- return returnMessage;
+ return val;
} catch (Throwable e) {
+ if (e instanceof RemoteException) {
+ Throwable cause = ((RemoteException)e).unwrapRemoteException();
+ throw new ServiceException(cause);
+ }
throw new ServiceException(e);
}
}
@@ -195,7 +198,7 @@ class ProtobufRpcEngine implements RpcEn
}
}
- private Message getReturnProtoType(Method method) throws Exception {
+ static Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
@@ -209,75 +212,7 @@ class ProtobufRpcEngine implements RpcEn
}
}
- /**
- * Writable Wrapper for Protocol Buffer Requests
- */
- private static class RpcRequestWritable implements Writable {
- RpcRequestBody message;
-
- @SuppressWarnings("unused")
- public RpcRequestWritable() {
- }
-
- RpcRequestWritable(RpcRequestBody message) {
- this.message = message;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- ((Message)message).writeDelimitedTo(
- DataOutputOutputStream.constructOutputStream(out));
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = ProtoUtil.readRawVarint32(in);
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- message = RpcRequestBody.parseFrom(bytes);
- }
-
- public int getSerializedSize() {
- return message.getSerializedSize();
- }
-
- @Override
- public String toString() {
- return " Client Protocol Version: " +
- message.getClientProtocolVersion() + " MethodName: " +
- message.getMethodName();
- }
- }
-
- /**
- * Writable Wrapper for Protocol Buffer Responses
- */
- private static class RpcResponseWritable implements Writable {
- byte[] responseMessage;
-
- @SuppressWarnings("unused")
- public RpcResponseWritable() {
- }
-
- public RpcResponseWritable(Message message) {
- this.responseMessage = message.toByteArray();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(responseMessage.length);
- out.write(responseMessage);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- responseMessage = bytes;
- }
- }
- public static class Server extends WritableRpcEngine.Server {
+ public static class Server extends HBaseServer {
boolean verbose;
Object instance;
Class<?> implementation;
@@ -295,16 +230,27 @@ class ProtobufRpcEngine implements RpcEn
private final int warnResponseTime;
private final int warnResponseSize;
+
+ private static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length-1];
+ }
+
public Server(Object instance, final Class<?>[] ifaces,
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel)
throws IOException {
- super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port,
- numHandlers, metaHandlerCount, verbose, highPriorityLevel);
- this.verbose = verbose;
+ super(bindAddress, port, numHandlers, metaHandlerCount,
+ conf, classNameBase(instance.getClass().getName()),
+ highPriorityLevel);
this.instance = instance;
this.implementation = instance.getClass();
+ this.verbose = verbose;
+
// create metrics for the advertised interfaces this server implements.
String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
@@ -313,23 +259,55 @@ class ProtobufRpcEngine implements RpcEn
DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
DEFAULT_WARN_RESPONSE_SIZE);
+ this.verbose = verbose;
+ this.instance = instance;
+ this.implementation = instance.getClass();
}
- private final Map<String, Message> methodArg =
+ private static final Map<String, Message> methodArg =
new ConcurrentHashMap<String, Message>();
- private final Map<String, Method> methodInstances =
+ private static final Map<String, Method> methodInstances =
new ConcurrentHashMap<String, Method>();
+
+ private AuthenticationTokenSecretManager createSecretManager(){
+ if (!User.isSecurityEnabled() ||
+ !(instance instanceof org.apache.hadoop.hbase.Server)) {
+ return null;
+ }
+ org.apache.hadoop.hbase.Server server =
+ (org.apache.hadoop.hbase.Server)instance;
+ Configuration conf = server.getConfiguration();
+ long keyUpdateInterval =
+ conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
+ long maxAge =
+ conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
+ return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
+ server.getServerName().toString(), keyUpdateInterval, maxAge);
+ }
+
+ @Override
+ public void startThreads() {
+ AuthenticationTokenSecretManager mgr = createSecretManager();
+ if (mgr != null) {
+ setSecretManager(mgr);
+ mgr.start();
+ }
+ this.authManager = new ServiceAuthorizationManager();
+ HBasePolicyProvider.init(conf, authManager);
+
+ // continue with base startup
+ super.startThreads();
+ }
+
@Override
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
- public Writable call(Class<? extends VersionedProtocol> protocol,
- Writable writableRequest, long receiveTime, MonitoredRPCHandler status)
+ public Message call(Class<? extends VersionedProtocol> protocol,
+ RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
throws IOException {
try {
- RpcRequestWritable request = (RpcRequestWritable) writableRequest;
- RpcRequestBody rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
Method method = getMethod(protocol, methodName);
if (method == null) {
@@ -358,7 +336,7 @@ class ProtobufRpcEngine implements RpcEn
status.setRPC(rpcRequest.getMethodName(),
new Object[]{rpcRequest.getRequest()}, receiveTime);
- status.setRPCPacket(writableRequest);
+ status.setRPCPacket(rpcRequest);
status.resume("Servicing call");
//get an instance of the method arg type
Message protoType = getMethodArgType(method);
@@ -398,7 +376,7 @@ class ProtobufRpcEngine implements RpcEn
rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(method.getName(), processingTime);
if (verbose) {
- WritableRpcEngine.log("Return: "+result, LOG);
+ log("Return: "+result, LOG);
}
long responseSize = result.getSerializedSize();
// log any RPC responses that are slower than the configured warn
@@ -432,7 +410,7 @@ class ProtobufRpcEngine implements RpcEn
rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
processingTime);
}
- return new RpcResponseWritable(result);
+ return result;
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
@@ -454,7 +432,7 @@ class ProtobufRpcEngine implements RpcEn
}
}
- private Method getMethod(Class<? extends VersionedProtocol> protocol,
+ static Method getMethod(Class<? extends VersionedProtocol> protocol,
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
@@ -472,7 +450,7 @@ class ProtobufRpcEngine implements RpcEn
return null;
}
- private Message getMethodArgType(Method method) throws Exception {
+ static Message getMethodArgType(Method method) throws Exception {
Message protoType = methodArg.get(method.getName());
if (protoType != null) {
return protoType;
@@ -497,5 +475,68 @@ class ProtobufRpcEngine implements RpcEn
methodArg.put(method.getName(), protoType);
return protoType;
}
+ /**
+ * Logs an RPC response to the LOG file, producing valid JSON objects for
+ * client Operations.
+ * @param params The parameters received in the call.
+ * @param methodName The name of the method invoked
+ * @param call The string representation of the call
+ * @param tag The tag that will be used to indicate this event in the log.
+ * @param client The address of the client who made this call.
+ * @param startTime The time that the call was initiated, in ms.
+ * @param processingTime The duration that the call took to run, in ms.
+ * @param qTime The duration that the call spent on the queue
+ * prior to being initiated, in ms.
+ * @param responseSize The size in bytes of the response buffer.
+ */
+ void logResponse(Object[] params, String methodName, String call, String tag,
+ String clientAddress, long startTime, int processingTime, int qTime,
+ long responseSize)
+ throws IOException {
+ // for JSON encoding
+ ObjectMapper mapper = new ObjectMapper();
+ // base information that is reported regardless of type of call
+ Map<String, Object> responseInfo = new HashMap<String, Object>();
+ responseInfo.put("starttimems", startTime);
+ responseInfo.put("processingtimems", processingTime);
+ responseInfo.put("queuetimems", qTime);
+ responseInfo.put("responsesize", responseSize);
+ responseInfo.put("client", clientAddress);
+ responseInfo.put("class", instance.getClass().getSimpleName());
+ responseInfo.put("method", methodName);
+ if (params.length == 2 && instance instanceof HRegionServer &&
+ params[0] instanceof byte[] &&
+ params[1] instanceof Operation) {
+ // if the slow process is a query, we want to log its table as well
+ // as its own fingerprint
+ byte [] tableName =
+ HRegionInfo.parseRegionName((byte[]) params[0])[0];
+ responseInfo.put("table", Bytes.toStringBinary(tableName));
+ // annotate the response map with operation details
+ responseInfo.putAll(((Operation) params[1]).toMap());
+ // report to the log file
+ LOG.warn("(operation" + tag + "): " +
+ mapper.writeValueAsString(responseInfo));
+ } else if (params.length == 1 && instance instanceof HRegionServer &&
+ params[0] instanceof Operation) {
+ // annotate the response map with operation details
+ responseInfo.putAll(((Operation) params[0]).toMap());
+ // report to the log file
+ LOG.warn("(operation" + tag + "): " +
+ mapper.writeValueAsString(responseInfo));
+ } else {
+ // can't get JSON details, so just report call.toString() along with
+ // a more generic tag.
+ responseInfo.put("call", call);
+ LOG.warn("(response" + tag + "): " +
+ mapper.writeValueAsString(responseInfo));
+ }
+ }
+ protected static void log(String value, Log LOG) {
+ String v = value;
+ if (v != null && v.length() > 55)
+ v = v.substring(0, 55)+"...";
+ LOG.info(v);
+ }
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Sun Aug 19 21:47:21 2012
@@ -21,10 +21,12 @@
package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
-import org.apache.hadoop.io.Writable;
+import com.google.protobuf.Message;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -47,16 +49,16 @@ public interface RpcServer {
/** Called for each call.
* @param param writable parameter
* @param receiveTime time
- * @return Writable
+ * @return Message
* @throws java.io.IOException e
*/
- Writable call(Class<? extends VersionedProtocol> protocol,
- Writable param, long receiveTime, MonitoredRPCHandler status)
+ Message call(Class<? extends VersionedProtocol> protocol,
+ RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
throws IOException;
void setErrorHandler(HBaseRPCErrorHandler handler);
- void setQosFunction(Function<Writable, Integer> newFunc);
+ void setQosFunction(Function<RpcRequestBody, Integer> newFunc);
void openServer();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Sun Aug 19 21:47:21 2012
@@ -1,468 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.UndeclaredThrowableException;
-
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Operation;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Objects;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.protobuf.ServiceException;
-
-/** An RpcEngine implementation for Writable data. */
-@InterfaceAudience.Private
-class WritableRpcEngine implements RpcEngine {
- // LOG is NOT in hbase subpackage intentionally so that the default HBase
- // DEBUG log level does NOT emit RPC-level logging.
- private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
-
- protected final static ClientCache CLIENTS = new ClientCache();
-
- private static class Invoker implements InvocationHandler {
- private Class<? extends VersionedProtocol> protocol;
- private InetSocketAddress address;
- private User ticket;
- private HBaseClient client;
- private boolean isClosed = false;
- final private int rpcTimeout;
-
- public Invoker(Class<? extends VersionedProtocol> protocol,
- InetSocketAddress address, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout) {
- this.protocol = protocol;
- this.address = address;
- this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory);
- this.rpcTimeout = rpcTimeout;
- }
-
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- final boolean logDebug = LOG.isDebugEnabled();
- long startTime = 0;
- if (logDebug) {
- startTime = System.currentTimeMillis();
- }
-
- try {
- HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address, protocol, ticket,
- rpcTimeout);
- if (logDebug) {
- // FIGURE HOW TO TURN THIS OFF!
- long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
- }
- return value.get();
- } catch (Throwable t) {
- // For protobuf protocols, ServiceException is expected
- if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
- if (t instanceof RemoteException) {
- Throwable cause = ((RemoteException)t).unwrapRemoteException();
- throw new ServiceException(cause);
- }
- throw new ServiceException(t);
- }
- throw t;
- }
- }
-
- /* close the IPC client that's responsible for this invoker's RPCs */
- synchronized protected void close() {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
- }
- }
-
- /** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address. */
- public VersionedProtocol getProxy(
- Class<? extends VersionedProtocol> protocol, long clientVersion,
- InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout)
- throws IOException {
-
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- try {
- long serverVersion = ((VersionedProtocol)proxy)
- .getProtocolVersion(protocol.getName(), clientVersion);
- if (serverVersion != clientVersion) {
- throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- } catch (Throwable t) {
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
- }
- if (t instanceof ServiceException) {
- throw ProtobufUtil.getRemoteException((ServiceException)t);
- }
- if (!(t instanceof IOException)) {
- LOG.error("Unexpected throwable object ", t);
- throw new IOException(t);
- }
- throw (IOException)t;
- }
- return proxy;
- }
-
- /**
- * Stop this proxy and release its invoker's resource
- * @param proxy the proxy to be stopped
- */
- public void stopProxy(VersionedProtocol proxy) {
- if (proxy!=null) {
- ((Invoker)Proxy.getInvocationHandler(proxy)).close();
- }
- }
-
- /** Construct a server for a protocol implementation instance listening on a
- * port and address. */
- public Server getServer(Class<? extends VersionedProtocol> protocol,
- Object instance,
- Class<?>[] ifaces,
- String bindAddress, int port,
- int numHandlers,
- int metaHandlerCount, boolean verbose,
- Configuration conf, int highPriorityLevel)
- throws IOException {
- return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
- metaHandlerCount, verbose, highPriorityLevel);
- }
-
- /** An RPC Server. */
- public static class Server extends HBaseServer {
- private Object instance;
- private Class<?> implementation;
- private Class<?>[] ifaces;
- private boolean verbose;
-
- private static final String WARN_RESPONSE_TIME =
- "hbase.ipc.warn.response.time";
- private static final String WARN_RESPONSE_SIZE =
- "hbase.ipc.warn.response.size";
-
- /** Default value for above params */
- private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
- private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
-
- /** Names for suffixed metrics */
- private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
-
- private final int warnResponseTime;
- private final int warnResponseSize;
-
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length-1];
- }
-
- /** Construct an RPC server.
- * @param instance the instance whose methods will be called
- * @param ifaces the interfaces the server supports
- * @param paramClass an instance of this class is used to read the RPC requests
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @param numHandlers the number of method handler threads to run
- * @param metaHandlerCount the number of meta handlers desired
- * @param verbose whether each call should be logged
- * @param highPriorityLevel the priority level this server treats as high priority RPCs
- * @throws IOException e
- */
- public Server(Object instance, final Class<?>[] ifaces,
- Class<? extends Writable> paramClass,
- Configuration conf, String bindAddress, int port,
- int numHandlers, int metaHandlerCount, boolean verbose,
- int highPriorityLevel) throws IOException {
- super(bindAddress, port, paramClass, numHandlers, metaHandlerCount,
- conf, classNameBase(instance.getClass().getName()),
- highPriorityLevel);
- this.instance = instance;
- this.implementation = instance.getClass();
- this.verbose = verbose;
-
- this.ifaces = ifaces;
-
- // create metrics for the advertised interfaces this server implements.
- String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
- this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes);
-
- this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
- DEFAULT_WARN_RESPONSE_TIME);
- this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
- DEFAULT_WARN_RESPONSE_SIZE);
- }
-
- public Server(Object instance, final Class<?>[] ifaces,
- Configuration conf, String bindAddress, int port,
- int numHandlers, int metaHandlerCount, boolean verbose,
- int highPriorityLevel) throws IOException {
- this(instance, ifaces, Invocation.class, conf, bindAddress, port,
- numHandlers, metaHandlerCount, verbose, highPriorityLevel);
- }
-
- public AuthenticationTokenSecretManager createSecretManager(){
- if (!User.isSecurityEnabled() ||
- !(instance instanceof org.apache.hadoop.hbase.Server)) {
- return null;
- }
- org.apache.hadoop.hbase.Server server =
- (org.apache.hadoop.hbase.Server)instance;
- Configuration conf = server.getConfiguration();
- long keyUpdateInterval =
- conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
- long maxAge =
- conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
- return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
- server.getServerName().toString(), keyUpdateInterval, maxAge);
- }
-
- @Override
- public void startThreads() {
- AuthenticationTokenSecretManager mgr = createSecretManager();
- if (mgr != null) {
- setSecretManager(mgr);
- mgr.start();
- }
- this.authManager = new ServiceAuthorizationManager();
- HBasePolicyProvider.init(conf, authManager);
-
- // continue with base startup
- super.startThreads();
- }
-
- @Override
- public Writable call(Class<? extends VersionedProtocol> protocol,
- Writable param, long receivedTime, MonitoredRPCHandler status)
- throws IOException {
- try {
- Invocation call = (Invocation)param;
- if(call.getMethodName() == null) {
- throw new IOException("Could not find requested method, the usual " +
- "cause is a version mismatch between client and server.");
- }
- if (verbose) log("Call: " + call, LOG);
- status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
- status.setRPCPacket(param);
- status.resume("Servicing call");
-
- Method method =
- protocol.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
-
- //Verify protocol version.
- //Bypass the version check for VersionedProtocol
- if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
- long clientVersion = call.getProtocolVersion();
- ProtocolSignature serverInfo = ((VersionedProtocol) instance)
- .getProtocolSignature(protocol.getCanonicalName(), call
- .getProtocolVersion(), call.getClientMethodsHash());
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- LOG.warn("Version mismatch: client version=" + clientVersion
- + ", server version=" + serverVersion);
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
- Object impl = null;
- if (protocol.isAssignableFrom(this.implementation)) {
- impl = this.instance;
- }
- else {
- throw new HBaseRPC.UnknownProtocolException(protocol);
- }
-
- long startTime = System.currentTimeMillis();
- Object[] params = call.getParameters();
- Object value = method.invoke(impl, params);
- int processingTime = (int) (System.currentTimeMillis() - startTime);
- int qTime = (int) (startTime-receivedTime);
- if (TRACELOG.isDebugEnabled()) {
- TRACELOG.debug("Call #" + CurCall.get().id +
- "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() +
- " queueTime=" + qTime +
- " processingTime=" + processingTime +
- " contents=" + Objects.describeQuantity(params));
- }
- rpcMetrics.rpcQueueTime.inc(qTime);
- rpcMetrics.rpcProcessingTime.inc(processingTime);
- rpcMetrics.inc(call.getMethodName(), processingTime);
- if (verbose) log("Return: "+value, LOG);
-
- HbaseObjectWritable retVal =
- new HbaseObjectWritable(method.getReturnType(), value);
- long responseSize = retVal.getWritableSize();
- // log any RPC responses that are slower than the configured warn
- // response time or larger than configured warning size
- boolean tooSlow = (processingTime > warnResponseTime
- && warnResponseTime > -1);
- boolean tooLarge = (responseSize > warnResponseSize
- && warnResponseSize > -1);
- if (tooSlow || tooLarge) {
- // when tagging, we let TooLarge trump TooSmall to keep output simple
- // note that large responses will often also be slow.
- logResponse(call.getParameters(), call.getMethodName(),
- call.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
- status.getClient(), startTime, processingTime, qTime,
- responseSize);
- // provides a count of log-reported slow responses
- if (tooSlow) {
- rpcMetrics.rpcSlowResponseTime.inc(processingTime);
- }
- }
- if (processingTime > 1000) {
- // we use a hard-coded one second period so that we can clearly
- // indicate the time period we're warning about in the name of the
- // metric itself
- rpcMetrics.inc(call.getMethodName() + ABOVE_ONE_SEC_METRIC,
- processingTime);
- }
-
- return retVal;
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof IOException) {
- throw (IOException)target;
- }
- if (target instanceof ServiceException) {
- throw ProtobufUtil.getRemoteException((ServiceException)target);
- }
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
- throw ioe;
- } catch (Throwable e) {
- if (!(e instanceof IOException)) {
- LOG.error("Unexpected throwable object ", e);
- }
- IOException ioe = new IOException(e.toString());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
- }
- }
-
- /**
- * Logs an RPC response to the LOG file, producing valid JSON objects for
- * client Operations.
- * @param params The parameters received in the call.
- * @param methodName The name of the method invoked
- * @param call The string representation of the call
- * @param tag The tag that will be used to indicate this event in the log.
- * @param client The address of the client who made this call.
- * @param startTime The time that the call was initiated, in ms.
- * @param processingTime The duration that the call took to run, in ms.
- * @param qTime The duration that the call spent on the queue
- * prior to being initiated, in ms.
- * @param responseSize The size in bytes of the response buffer.
- */
- void logResponse(Object[] params, String methodName, String call, String tag,
- String clientAddress, long startTime, int processingTime, int qTime,
- long responseSize)
- throws IOException {
- // for JSON encoding
- ObjectMapper mapper = new ObjectMapper();
- // base information that is reported regardless of type of call
- Map<String, Object> responseInfo = new HashMap<String, Object>();
- responseInfo.put("starttimems", startTime);
- responseInfo.put("processingtimems", processingTime);
- responseInfo.put("queuetimems", qTime);
- responseInfo.put("responsesize", responseSize);
- responseInfo.put("client", clientAddress);
- responseInfo.put("class", instance.getClass().getSimpleName());
- responseInfo.put("method", methodName);
- if (params.length == 2 && instance instanceof HRegionServer &&
- params[0] instanceof byte[] &&
- params[1] instanceof Operation) {
- // if the slow process is a query, we want to log its table as well
- // as its own fingerprint
- byte [] tableName =
- HRegionInfo.parseRegionName((byte[]) params[0])[0];
- responseInfo.put("table", Bytes.toStringBinary(tableName));
- // annotate the response map with operation details
- responseInfo.putAll(((Operation) params[1]).toMap());
- // report to the log file
- LOG.warn("(operation" + tag + "): " +
- mapper.writeValueAsString(responseInfo));
- } else if (params.length == 1 && instance instanceof HRegionServer &&
- params[0] instanceof Operation) {
- // annotate the response map with operation details
- responseInfo.putAll(((Operation) params[0]).toMap());
- // report to the log file
- LOG.warn("(operation" + tag + "): " +
- mapper.writeValueAsString(responseInfo));
- } else {
- // can't get JSON details, so just report call.toString() along with
- // a more generic tag.
- responseInfo.put("call", call);
- LOG.warn("(response" + tag + "): " +
- mapper.writeValueAsString(responseInfo));
- }
- }
- }
-
- protected static void log(String value, Log LOG) {
- String v = value;
- if (v != null && v.length() > 55)
- v = v.substring(0, 55)+"...";
- LOG.info(v);
- }
-}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java Sun Aug 19 21:47:21 2012
@@ -20,8 +20,7 @@
package org.apache.hadoop.hbase.monitoring;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
/**
* A MonitoredTask implementation optimized for use with RPC Handlers
@@ -38,9 +37,9 @@ public interface MonitoredRPCHandler ext
public abstract long getRPCQueueTime();
public abstract boolean isRPCRunning();
public abstract boolean isOperationRunning();
-
+
public abstract void setRPC(String methodName, Object [] params,
long queueTime);
- public abstract void setRPCPacket(Writable param);
+ public abstract void setRPCPacket(RpcRequestBody param);
public abstract void setConnection(String clientAddress, int remotePort);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Sun Aug 19 21:47:21 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.monitori
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -46,7 +47,7 @@ public class MonitoredRPCHandlerImpl ext
private long rpcStartTime;
private String methodName = "";
private Object [] params = {};
- private Writable packet;
+ private RpcRequestBody packet;
public MonitoredRPCHandlerImpl() {
super();
@@ -141,11 +142,7 @@ public class MonitoredRPCHandlerImpl ext
// no RPC is currently running, or we don't have an RPC's packet info
return -1L;
}
- if (!(packet instanceof WritableWithSize)) {
- // the packet passed to us doesn't expose size information
- return -1L;
- }
- return ((WritableWithSize) packet).getWritableSize();
+ return packet.getSerializedSize();
}
/**
@@ -201,11 +198,11 @@ public class MonitoredRPCHandlerImpl ext
}
/**
- * Gives this instance a reference to the Writable received by the RPC, so
+ * Gives this instance a reference to the protobuf received by the RPC, so
* that it can later compute its size if asked for it.
- * @param param The Writable received by the RPC for this call
+ * @param param The protobuf received by the RPC for this call
*/
- public void setRPCPacket(Writable param) {
+ public void setRPCPacket(RpcRequestBody param) {
this.packet = param;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java?rev=1374860&r1=1374859&r2=1374860&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java Sun Aug 19 21:47:21 2012
@@ -1492,6 +1492,10 @@ public final class RPCProtos {
// optional bytes request = 3;
boolean hasRequest();
com.google.protobuf.ByteString getRequest();
+
+ // optional string requestClassName = 4;
+ boolean hasRequestClassName();
+ String getRequestClassName();
}
public static final class RpcRequestBody extends
com.google.protobuf.GeneratedMessage
@@ -1574,10 +1578,43 @@ public final class RPCProtos {
return request_;
}
+ // optional string requestClassName = 4;
+ public static final int REQUESTCLASSNAME_FIELD_NUMBER = 4;
+ private java.lang.Object requestClassName_;
+ public boolean hasRequestClassName() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getRequestClassName() {
+ java.lang.Object ref = requestClassName_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ requestClassName_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getRequestClassNameBytes() {
+ java.lang.Object ref = requestClassName_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ requestClassName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
methodName_ = "";
clientProtocolVersion_ = 0L;
request_ = com.google.protobuf.ByteString.EMPTY;
+ requestClassName_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1604,6 +1641,9 @@ public final class RPCProtos {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, request_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, getRequestClassNameBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -1625,6 +1665,10 @@ public final class RPCProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, request_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, getRequestClassNameBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1663,6 +1707,11 @@ public final class RPCProtos {
result = result && getRequest()
.equals(other.getRequest());
}
+ result = result && (hasRequestClassName() == other.hasRequestClassName());
+ if (hasRequestClassName()) {
+ result = result && getRequestClassName()
+ .equals(other.getRequestClassName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1684,6 +1733,10 @@ public final class RPCProtos {
hash = (37 * hash) + REQUEST_FIELD_NUMBER;
hash = (53 * hash) + getRequest().hashCode();
}
+ if (hasRequestClassName()) {
+ hash = (37 * hash) + REQUESTCLASSNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRequestClassName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -1806,6 +1859,8 @@ public final class RPCProtos {
bitField0_ = (bitField0_ & ~0x00000002);
request_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
+ requestClassName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -1856,6 +1911,10 @@ public final class RPCProtos {
to_bitField0_ |= 0x00000004;
}
result.request_ = request_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.requestClassName_ = requestClassName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1881,6 +1940,9 @@ public final class RPCProtos {
if (other.hasRequest()) {
setRequest(other.getRequest());
}
+ if (other.hasRequestClassName()) {
+ setRequestClassName(other.getRequestClassName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1931,6 +1993,11 @@ public final class RPCProtos {
request_ = input.readBytes();
break;
}
+ case 34: {
+ bitField0_ |= 0x00000008;
+ requestClassName_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -2018,6 +2085,42 @@ public final class RPCProtos {
return this;
}
+ // optional string requestClassName = 4;
+ private java.lang.Object requestClassName_ = "";
+ public boolean hasRequestClassName() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getRequestClassName() {
+ java.lang.Object ref = requestClassName_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ requestClassName_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setRequestClassName(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ requestClassName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearRequestClassName() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ requestClassName_ = getDefaultInstance().getRequestClassName();
+ onChanged();
+ return this;
+ }
+ void setRequestClassName(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000008;
+ requestClassName_ = value;
+ onChanged();
+ }
+
// @@protoc_insertion_point(builder_scope:RpcRequestBody)
}
@@ -2032,7 +2135,7 @@ public final class RPCProtos {
public interface RpcResponseHeaderOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required int32 callId = 1;
+ // required uint32 callId = 1;
boolean hasCallId();
int getCallId();
@@ -2141,7 +2244,7 @@ public final class RPCProtos {
}
private int bitField0_;
- // required int32 callId = 1;
+ // required uint32 callId = 1;
public static final int CALLID_FIELD_NUMBER = 1;
private int callId_;
public boolean hasCallId() {
@@ -2186,7 +2289,7 @@ public final class RPCProtos {
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
- output.writeInt32(1, callId_);
+ output.writeUInt32(1, callId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeEnum(2, status_.getNumber());
@@ -2202,7 +2305,7 @@ public final class RPCProtos {
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
- .computeInt32Size(1, callId_);
+ .computeUInt32Size(1, callId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
@@ -2487,7 +2590,7 @@ public final class RPCProtos {
}
case 8: {
bitField0_ |= 0x00000001;
- callId_ = input.readInt32();
+ callId_ = input.readUInt32();
break;
}
case 16: {
@@ -2507,7 +2610,7 @@ public final class RPCProtos {
private int bitField0_;
- // required int32 callId = 1;
+ // required uint32 callId = 1;
private int callId_ ;
public boolean hasCallId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
@@ -3505,16 +3608,17 @@ public final class RPCProtos {
"ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" +
"rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" +
"doop.hbase.client.ClientProtocol\"\"\n\020RpcR" +
- "equestHeader\022\016\n\006callId\030\001 \002(\r\"T\n\016RpcReque" +
+ "equestHeader\022\016\n\006callId\030\001 \002(\r\"n\n\016RpcReque" +
"stBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientProt" +
- "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\"{\n\021Rp" +
- "cResponseHeader\022\016\n\006callId\030\001 \002(\005\022)\n\006statu" +
- "s\030\002 \002(\0162\031.RpcResponseHeader.Status\"+\n\006St",
- "atus\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"" +
- "#\n\017RpcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014" +
- "RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\ns" +
- "tackTrace\030\002 \001(\tB<\n*org.apache.hadoop.hba" +
- "se.protobuf.generatedB\tRPCProtosH\001\240\001\001"
+ "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020re" +
+ "questClassName\030\004 \001(\t\"{\n\021RpcResponseHeade" +
+ "r\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 \002(\0162\031.RpcR",
+ "esponseHeader.Status\"+\n\006Status\022\013\n\007SUCCES" +
+ "S\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcResponse" +
+ "Body\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022\025" +
+ "\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001(" +
+ "\tB<\n*org.apache.hadoop.hbase.protobuf.ge" +
+ "neratedB\tRPCProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3550,7 +3654,7 @@ public final class RPCProtos {
internal_static_RpcRequestBody_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RpcRequestBody_descriptor,
- new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", },
+ new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", "RequestClassName", },
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.class,
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.Builder.class);
internal_static_RpcResponseHeader_descriptor =