You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2011/12/04 21:44:38 UTC
svn commit: r1210208 [1/2] - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ipc/
src/main/java/org/apache/hadoop/ipc/protobuf/ src/proto/
src/test/java/org/apache/hadoop/ipc/ src/test/java/org/apache/had...
Author: sradia
Date: Sun Dec 4 20:44:36 2011
New Revision: 1210208
URL: http://svn.apache.org/viewvc?rev=1210208&view=rev
Log:
HADOOP-7862 Move the support for multiple protocols to lower layer so that Writable, PB and Avro can all use it (includes HDFS and MR changes to match) (Sanjay)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/protobuf/TestRpcServiceProtos.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Sun Dec 4 20:44:36 2011
@@ -61,6 +61,9 @@ Trunk (unreleased changes)
HADOOP-7590. Mavenize streaming and MR examples. (tucu)
+ HADOOP-7862 Move the support for multiple protocols to lower layer so that Writable,
+ PB and Avro can all use it (Sanjay)
+
BUGS
HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Sun Dec 4 20:44:36 2011
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -237,14 +238,15 @@ public class AvroRpcEngine implements Rp
super((Class)null, new Object(), conf,
bindAddress, port, numHandlers, numReaders,
queueSizePerHandler, verbose, secretManager);
- super.addProtocol(TunnelProtocol.class, responder);
+ // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
+ super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
responder.addProtocol(iface, impl);
}
@Override
- public <PROTO, IMPL extends PROTO> Server
- addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+ public Server
+ addProtocol(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl)
throws IOException {
responder.addProtocol(protocolClass, protocolImpl);
return this;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Sun Dec 4 20:44:36 2011
@@ -1002,17 +1002,19 @@ public class Client {
}
/**
- * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+ * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+ * for RPC_BUILTIN
*/
public Writable call(Writable param, InetSocketAddress address)
throws InterruptedException, IOException {
- return call(RpcKind.RPC_WRITABLE, param, address);
+ return call(RpcKind.RPC_BUILTIN, param, address);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
- * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+ * ConnectionId)} instead
*/
@Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
@@ -1025,7 +1027,8 @@ public class Client {
* the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+ * ConnectionId)} instead
*/
@Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
@@ -1042,7 +1045,8 @@ public class Client {
* timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+ * ConnectionId)} instead
*/
@Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
@@ -1056,7 +1060,7 @@ public class Client {
/**
- * Same as {@link #call(RpcKind, Writable, InetSocketAddress,
+ * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable.
*/
@@ -1066,7 +1070,7 @@ public class Client {
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
- return call(RpcKind.RPC_WRITABLE, param, remoteId);
+ return call(RpcKind.RPC_BUILTIN, param, remoteId);
}
/**
@@ -1087,21 +1091,28 @@ public class Client {
}
/**
- * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
- * except the rpcKind is RPC_WRITABLE
+ * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+ * except the rpcKind is RPC_BUILTIN
*/
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
- return call(RpcKind.RPC_WRITABLE, param, remoteId);
+ return call(RpcKind.RPC_BUILTIN, param, remoteId);
}
- /** Make a call, passing <code>param</code>, to the IPC server defined by
- * <code>remoteId</code>, returning the value.
+ /**
+ * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+ * <code>remoteId</code>, returning the rpc respond.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
- * threw an exception. */
- public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
- Call call = new Call(rpcKind, param);
+ * threw an exception.
+ */
+ public Writable call(RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId) throws InterruptedException, IOException {
+ Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Sun Dec 4 20:44:36 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
@@ -60,6 +61,12 @@ import com.google.protobuf.ServiceExcept
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+
+ static { // Register the rpcRequest deserializer for WritableRpcEngine
+ org.apache.hadoop.ipc.Server.registerProtocolEngine(
+ RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+ new Server.ProtoBufRpcInvoker());
+ }
private static final ClientCache CLIENTS = new ClientCache();
@@ -75,10 +82,13 @@ public class ProtobufRpcEngine implement
}
private static class Invoker implements InvocationHandler, Closeable {
- private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+ private final Map<String, Message> returnTypes =
+ new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
- private Client.ConnectionId remoteId;
- private Client client;
+ private final Client.ConnectionId remoteId;
+ private final Client client;
+ private final long clientProtocolVersion;
+ private final String protocolName;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
@@ -87,6 +97,8 @@ public class ProtobufRpcEngine implement
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory,
RpcResponseWritable.class);
+ this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
+ this.protocolName = RPC.getProtocolName(protocol);
}
private HadoopRpcRequestProto constructRpcRequest(Method method,
@@ -108,6 +120,19 @@ public class ProtobufRpcEngine implement
Message param = (Message) params[1];
builder.setRequest(param.toByteString());
+ // For protobuf, {@code protocol} used when creating client side proxy is
+ // the interface extending BlockingInterface, which has the annotations
+ // such as ProtocolName etc.
+ //
+ // Using Method.getDeclaringClass(), as in WritableEngine to get at
+ // the protocol interface will return BlockingInterface, from where
+ // the annotation ProtocolName and Version cannot be
+ // obtained.
+ //
+ // Hence we simply use the protocol class used to create the proxy.
+ // For PB this may limit the use of mixins on client side.
+ builder.setDeclaringClassProtocolName(protocolName);
+ builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
@@ -272,15 +297,16 @@ public class ProtobufRpcEngine implement
RpcResponseWritable.class);
}
+
@Override
- public RPC.Server getServer(Class<?> protocol, Object instance,
+ public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers,
- numReaders, queueSizePerHandler, verbose, secretManager);
+ return new Server(protocol, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
}
private static RemoteException getRemoteException(Exception e) {
@@ -289,87 +315,31 @@ public class ProtobufRpcEngine implement
}
public static class Server extends RPC.Server {
- private BlockingService service;
- private boolean verbose;
-
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length - 1];
- }
-
/**
* Construct an RPC server.
*
- * @param instance the instance whose methods will be called
+ * @param protocolClass the class of protocol
+ * @param protocolImpl the protocolImpl whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
- public Server(Object instance, Configuration conf, String bindAddress,
- int port, int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port, int numHandlers,
+ int numReaders, int queueSizePerHandler, boolean verbose,
+ SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, RpcRequestWritable.class, numHandlers,
- numReaders, queueSizePerHandler, conf, classNameBase(instance
+ numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager);
- this.service = (BlockingService) instance;
- this.verbose = verbose;
+ this.verbose = verbose;
+ registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER,
+ protocolClass, protocolImpl);
}
- /**
- * This is a server side method, which is invoked over RPC. On success
- * the return response has protobuf response payload. On failure, the
- * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
- *
- * In this method there three types of exceptions possible and they are
- * returned in response as follows.
- * <ol>
- * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
- * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
- * this method returns in response the exception thrown by the service.</li>
- * <li> Other exceptions thrown by the service. They are returned as
- * it is.</li>
- * </ol>
- */
- @Override
- public Writable call(String protocol, Writable writableRequest,
- long receiveTime) throws IOException {
- RpcRequestWritable request = (RpcRequestWritable) writableRequest;
- HadoopRpcRequestProto rpcRequest = request.message;
- String methodName = rpcRequest.getMethodName();
- if (verbose)
- LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
- MethodDescriptor methodDescriptor = service.getDescriptorForType()
- .findMethodByName(methodName);
- if (methodDescriptor == null) {
- String msg = "Unknown method " + methodName + " called on " + protocol
- + " protocol.";
- LOG.warn(msg);
- return handleException(new RpcServerException(msg));
- }
- Message prototype = service.getRequestPrototype(methodDescriptor);
- Message param = prototype.newBuilderForType()
- .mergeFrom(rpcRequest.getRequest()).build();
- Message result;
- try {
- result = service.callBlockingMethod(methodDescriptor, null, param);
- } catch (ServiceException e) {
- Throwable cause = e.getCause();
- return handleException(cause != null ? cause : e);
- } catch (Exception e) {
- return handleException(e);
- }
-
- HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
- return new RpcResponseWritable(response);
- }
-
- private RpcResponseWritable handleException(Throwable e) {
+ private static RpcResponseWritable handleException(Throwable e) {
HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
.setExceptionName(e.getClass().getName())
.setStackTrace(StringUtils.stringifyException(e)).build();
@@ -378,7 +348,7 @@ public class ProtobufRpcEngine implement
return new RpcResponseWritable(response);
}
- private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+ private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
Message message) {
HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
.setResponse(message.toByteString())
@@ -386,5 +356,81 @@ public class ProtobufRpcEngine implement
.build();
return res;
}
+
+ /**
+ * Protobuf invoker for {@link RpcInvoker}
+ */
+ static class ProtoBufRpcInvoker implements RpcInvoker {
+
+ @Override
+ /**
+ * This is a server side method, which is invoked over RPC. On success
+ * the return response has protobuf response payload. On failure, the
+ * exception name and the stack trace are return in the resposne.
+ * See {@link HadoopRpcResponseProto}
+ *
+ * In this method there three types of exceptions possible and they are
+ * returned in response as follows.
+ * <ol>
+ * <li> Exceptions encountered in this method that are returned
+ * as {@link RpcServerException} </li>
+ * <li> Exceptions thrown by the service is wrapped in ServiceException.
+ * In that this method returns in response the exception thrown by the
+ * service.</li>
+ * <li> Other exceptions thrown by the service. They are returned as
+ * it is.</li>
+ * </ol>
+ */
+ public Writable call(RPC.Server server, String protocol,
+ Writable writableRequest, long receiveTime) throws IOException {
+ RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+ HadoopRpcRequestProto rpcRequest = request.message;
+ String methodName = rpcRequest.getMethodName();
+ String protoName = rpcRequest.getDeclaringClassProtocolName();
+ long clientVersion = rpcRequest.getClientProtocolVersion();
+ if (server.verbose)
+ LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+
+ ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+ ProtoClassProtoImpl protocolImpl =
+ server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
+ protoName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protoName);
+ }
+ // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
+
+ BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+ MethodDescriptor methodDescriptor = service.getDescriptorForType()
+ .findMethodByName(methodName);
+ if (methodDescriptor == null) {
+ String msg = "Unknown method " + methodName + " called on " + protocol
+ + " protocol.";
+ LOG.warn(msg);
+ return handleException(new RpcServerException(msg));
+ }
+ Message prototype = service.getRequestPrototype(methodDescriptor);
+ Message param = prototype.newBuilderForType()
+ .mergeFrom(rpcRequest.getRequest()).build();
+ Message result;
+ try {
+ result = service.callBlockingMethod(methodDescriptor, null, param);
+ } catch (ServiceException e) {
+ Throwable cause = e.getCause();
+ return handleException(cause != null ? cause : e);
+ } catch (Exception e) {
+ return handleException(e);
+ }
+
+ HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+ return new RpcResponseWritable(response);
+ }
+ }
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Sun Dec 4 20:44:36 2011
@@ -35,4 +35,5 @@ import java.lang.annotation.RetentionPol
@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {
String protocolName(); // the name of the protocol (i.e. rpc service)
+ long protocolVersion() default -1; // default means not defined use old way
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Sun Dec 4 20:44:36 2011
@@ -57,19 +57,11 @@ public class ProtocolProxy<T> {
private void fetchServerMethods(Method method) throws IOException {
long clientVersion;
- try {
- Field versionField = method.getDeclaringClass().getField("versionID");
- versionField.setAccessible(true);
- clientVersion = versionField.getLong(method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
+ clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
int clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
- .getProtocolSignature(protocol.getName(), clientVersion,
+ .getProtocolSignature(RPC.getProtocolName(protocol), clientVersion,
clientMethodsHash);
long serverVersion = serverInfo.getVersion();
if (serverVersion != clientVersion) {
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Sun Dec 4 20:44:36 2011
@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public class ProtocolSignature implements Writable {
static { // register a ctor
WritableFactories.setFactory
@@ -164,10 +166,15 @@ public class ProtocolSignature implement
/**
* A cache that maps a protocol's name to its signature & finger print
*/
- final private static HashMap<String, ProtocolSigFingerprint>
+ private final static HashMap<String, ProtocolSigFingerprint>
PROTOCOL_FINGERPRINT_CACHE =
new HashMap<String, ProtocolSigFingerprint>();
+ @VisibleForTesting
+ public static void resetCache() {
+ PROTOCOL_FINGERPRINT_CACHE.clear();
+ }
+
/**
* Return a protocol's signature and finger print from cache
*
@@ -177,7 +184,7 @@ public class ProtocolSignature implement
*/
private static ProtocolSigFingerprint getSigFingerprint(
Class <? extends VersionedProtocol> protocol, long serverVersion) {
- String protocolName = protocol.getName();
+ String protocolName = RPC.getProtocolName(protocol);
synchronized (PROTOCOL_FINGERPRINT_CACHE) {
ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
if (sig == null) {
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sun Dec 4 20:44:36 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.ipc;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
@@ -28,6 +29,9 @@ import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.io.*;
import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -36,6 +40,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -63,8 +68,54 @@ import org.apache.hadoop.util.Reflection
* the protocol instance is transmitted.
*/
public class RPC {
+
+ interface RpcInvoker {
+ /**
+ * Process a client call on the server side
+ * @param server the server within whose context this rpc call is made
+ * @param protocol - the protocol name (the class of the client proxy
+ * used to make calls to the rpc server.
+ * @param rpcRequest - deserialized
+ * @param receiveTime time at which the call received (for metrics)
+ * @return the call's return
+ * @throws IOException
+ **/
+ public Writable call(Server server, String protocol,
+ Writable rpcRequest, long receiveTime) throws IOException ;
+ }
+
static final Log LOG = LogFactory.getLog(RPC.class);
+ /**
+ * Get all superInterfaces that extend VersionedProtocol
+ * @param childInterfaces
+ * @return the super interfaces that extend VersionedProtocol
+ */
+ static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+ List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+ for (Class<?> childInterface : childInterfaces) {
+ if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+ allInterfaces.add(childInterface);
+ allInterfaces.addAll(
+ Arrays.asList(
+ getSuperInterfaces(childInterface.getInterfaces())));
+ } else {
+ LOG.warn("Interface " + childInterface +
+ " ignored because it does not extend VersionedProtocol");
+ }
+ }
+ return allInterfaces.toArray(new Class[allInterfaces.size()]);
+ }
+
+ /**
+ * Get all interfaces that the given protocol implements or extends
+ * which are assignable from VersionedProtocol.
+ */
+ static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+ Class<?>[] interfaces = protocol.getInterfaces();
+ return getSuperInterfaces(interfaces);
+ }
/**
* Get the protocol name.
@@ -75,9 +126,36 @@ public class RPC {
if (protocol == null) {
return null;
}
- ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+ ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
return (anno == null) ? protocol.getName() : anno.protocolName();
}
+
+ /**
+ * Get the protocol version from protocol class.
+ * If the protocol class has a ProtocolAnnotation, then get the protocol
+ * name from the annotation; otherwise the class name is the protocol name.
+ */
+ static public long getProtocolVersion(Class<?> protocol) {
+ if (protocol == null) {
+ throw new IllegalArgumentException("Null protocol");
+ }
+ long version;
+ ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+ if (anno != null) {
+ version = anno.protocolVersion();
+ if (version != -1)
+ return version;
+ }
+ try {
+ Field versionField = protocol.getField("versionID");
+ versionField.setAccessible(true);
+ return versionField.getLong(protocol);
+ } catch (NoSuchFieldException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
private RPC() {} // no public ctor
@@ -590,6 +668,144 @@ public class RPC {
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
+ boolean verbose;
+ static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length-1];
+ }
+
+ /**
+ * Store a map of protocol and version to its implementation
+ */
+ /**
+ * The key in Map
+ */
+ static class ProtoNameVer {
+ final String protocol;
+ final long version;
+ ProtoNameVer(String protocol, long ver) {
+ this.protocol = protocol;
+ this.version = ver;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ if (this == o)
+ return true;
+ if (! (o instanceof ProtoNameVer))
+ return false;
+ ProtoNameVer pv = (ProtoNameVer) o;
+ return ((pv.protocol.equals(this.protocol)) &&
+ (pv.version == this.version));
+ }
+ @Override
+ public int hashCode() {
+ return protocol.hashCode() * 37 + (int) version;
+ }
+ }
+
+ /**
+ * The value in map
+ */
+ static class ProtoClassProtoImpl {
+ final Class<?> protocolClass;
+ final Object protocolImpl;
+ ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+ this.protocolClass = protocolClass;
+ this.protocolImpl = protocolImpl;
+ }
+ }
+
+ ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray =
+ new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+
+ Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+ if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+ for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+ protocolImplMapArray.add(
+ new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+ }
+ }
+ return protocolImplMapArray.get(rpcKind.ordinal());
+ }
+
+ // Register protocol and its impl for rpc calls
+ void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass,
+ Object protocolImpl) throws IOException {
+ String protocolName = RPC.getProtocolName(protocolClass);
+ long version;
+
+
+ try {
+ version = RPC.getProtocolVersion(protocolClass);
+ } catch (Exception ex) {
+ LOG.warn("Protocol " + protocolClass +
+ " NOT registered as cannot get protocol version ");
+ return;
+ }
+
+
+ getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+ new ProtoClassProtoImpl(protocolClass, protocolImpl));
+ LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName + " version=" + version +
+ " ProtocolImpl=" + protocolImpl.getClass().getName() +
+ " protocolClass=" + protocolClass.getName());
+ }
+
+ static class VerProtocolImpl {
+ final long version;
+ final ProtoClassProtoImpl protocolTarget;
+ VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+ this.version = ver;
+ this.protocolTarget = protocolTarget;
+ }
+ }
+
+
+ @SuppressWarnings("unused") // will be useful later.
+ VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+ String protocolName) {
+ VerProtocolImpl[] resultk =
+ new VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+ int i = 0;
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+ getProtocolImplMap(rpcKind).entrySet()) {
+ if (pv.getKey().protocol.equals(protocolName)) {
+ resultk[i++] =
+ new VerProtocolImpl(pv.getKey().version, pv.getValue());
+ }
+ }
+ if (i == 0) {
+ return null;
+ }
+ VerProtocolImpl[] result = new VerProtocolImpl[i];
+ System.arraycopy(resultk, 0, result, 0, i);
+ return result;
+ }
+
+ VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind,
+ String protocolName) {
+ Long highestVersion = 0L;
+ ProtoClassProtoImpl highest = null;
+ System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+ getProtocolImplMap(rpcKind).entrySet()) {
+ if (pv.getKey().protocol.equals(protocolName)) {
+ if ((highest == null) || (pv.getKey().version > highestVersion)) {
+ highest = pv.getValue();
+ highestVersion = pv.getKey().version;
+ }
+ }
+ }
+ if (highest == null) {
+ return null;
+ }
+ return new VerProtocolImpl(highestVersion, highest);
+ }
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
@@ -606,11 +822,17 @@ public class RPC {
* @param protocolImpl - the impl of the protocol that will be called
* @return the server (for convenience)
*/
- public <PROTO, IMPL extends PROTO>
- Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
- ) throws IOException {
- throw new IOException("addProtocol Not Implemented");
+ public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+ Object protocolImpl) throws IOException {
+ registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+ return this;
+ }
+
+ @Override
+ public Writable call(RpcKind rpcKind, String protocol,
+ Writable rpcRequest, long receiveTime) throws IOException {
+ return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+ receiveTime);
}
}
-
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java Sun Dec 4 20:44:36 2011
@@ -54,13 +54,14 @@ public class RpcPayloadHeader implements
}
public enum RpcKind {
- RPC_BUILTIN ((short ) 1), // Used for built in calls
- RPC_WRITABLE ((short ) 2),
- RPC_PROTOCOL_BUFFER ((short)3),
- RPC_AVRO ((short)4);
-
+ RPC_BUILTIN ((short) 1), // Used for built in calls by tests
+ RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
+ RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
+ RPC_AVRO ((short) 4); // Use AvroRpcEngine
+ static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+ private static final short FIRST_INDEX = RPC_BUILTIN.value;
private final short value;
- private static final short FIRST_INDEX = RPC_BUILTIN.value;
+
RpcKind(short val) {
this.value = val;
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Sun Dec 4 20:44:36 2011
@@ -43,6 +43,7 @@ import java.nio.channels.WritableByteCha
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -66,6 +67,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -133,6 +135,59 @@ public abstract class Server {
* Initial and max size of response buffer
*/
static int INITIAL_RESP_BUF_SIZE = 10240;
+
+ static class RpcKindMapValue {
+ final Class<? extends Writable> rpcRequestWrapperClass;
+ final RpcInvoker rpcInvoker;
+ RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+ RpcInvoker rpcInvoker) {
+ this.rpcInvoker = rpcInvoker;
+ this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+ }
+ }
+ static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+ HashMap<RpcKind, RpcKindMapValue>(4);
+
+
+
+ /**
+ * Register a RPC kind and the class to deserialize the rpc request.
+ *
+ * Called by static initializers of rpcKind Engines
+ * @param rpcKind
+ * @param rpcRequestWrapperClass - this class is used to deserialze the
+ * the rpc request.
+ * @param rpcInvoker - use to process the calls on SS.
+ */
+
+ public static void registerProtocolEngine(RpcKind rpcKind,
+ Class<? extends Writable> rpcRequestWrapperClass,
+ RpcInvoker rpcInvoker) {
+ RpcKindMapValue old =
+ rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+ if (old != null) {
+ rpcKindMap.put(rpcKind, old);
+ throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+ rpcKind);
+ }
+ LOG.info("rpcKind=" + rpcKind +
+ ", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
+ ", rpcInvoker=" + rpcInvoker);
+ }
+
+ public Class<? extends Writable> getRpcRequestWrapper(
+ RpcKind rpcKind) {
+ if (rpcRequestClass != null)
+ return rpcRequestClass;
+ RpcKindMapValue val = rpcKindMap.get(rpcKind);
+ return (val == null) ? null : val.rpcRequestWrapperClass;
+ }
+
+ public static RpcInvoker getRpcInvoker(RpcKind rpcKind) {
+ RpcKindMapValue val = rpcKindMap.get(rpcKind);
+ return (val == null) ? null : val.rpcInvoker;
+ }
+
public static final Log LOG = LogFactory.getLog(Server.class);
public static final Log AUDITLOG =
@@ -197,7 +252,7 @@ public abstract class Server {
private int port; // port we listen on
private int handlerCount; // number of handler threads
private int readThreads; // number of read threads
- private Class<? extends Writable> paramClass; // class of call parameters
+ private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
private int maxIdleTime; // the maximum idle time after
// which a client may be disconnected
private int thresholdIdleConnections; // the number of idle connections
@@ -1425,9 +1480,27 @@ public abstract class Server {
throw new IOException("IPC Server does not implement operation" +
header.getOperation());
}
+ // If we know the rpc kind, get its class so that we can deserialize
+ // (Note it would make more sense to have the handler deserialize but
+ // we continue with this original design.
+ Class<? extends Writable> rpcRequestClass =
+ getRpcRequestWrapper(header.getkind());
+ if (rpcRequestClass == null) {
+ LOG.warn("Unknown rpc kind " + header.getkind() +
+ " from client " + getHostAddress());
+ final Call readParamsFailedCall =
+ new Call(header.getCallId(), null, this);
+ ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+ setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+ IOException.class.getName(),
+ "Unknown rpc kind " + header.getkind());
+ responder.doRespond(readParamsFailedCall);
+ return;
+ }
Writable rpcRequest;
try { //Read the rpc request
- rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+ rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
@@ -1519,7 +1592,7 @@ public abstract class Server {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
- value = call(call.connection.protocolName, call.rpcRequest,
+ value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
@@ -1528,7 +1601,7 @@ public abstract class Server {
@Override
public Writable run() throws Exception {
// make the call
- return call(call.connection.protocolName,
+ return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
}
@@ -1590,24 +1663,33 @@ public abstract class Server {
Configuration conf)
throws IOException
{
- this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+ this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+ .toString(port), null);
}
- /** Constructs a server listening on the named port and address. Parameters passed must
+ /**
+ * Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
* If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
* from configuration. Otherwise the configuration will be picked up.
+ *
+ * If rpcRequestClass is null then the rpcRequestClass must have been
+ * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+ * Class, RPC.RpcInvoker)}
+ * This parameter has been retained for compatibility with existing tests
+ * and usage.
*/
@SuppressWarnings("unchecked")
- protected Server(String bindAddress, int port,
- Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
- Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
+ protected Server(String bindAddress, int port,
+ Class<? extends Writable> rpcRequestClass, int handlerCount,
+ int numReaders, int queueSizePerHandler, Configuration conf,
+ String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
- this.paramClass = paramClass;
+ this.rpcRequestClass = rpcRequestClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
if (queueSizePerHandler != -1) {
@@ -1797,17 +1879,17 @@ public abstract class Server {
/**
* Called for each call.
- * @deprecated Use {@link #call(String, Writable, long)} instead
+ * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, String,
+ * Writable, long)} instead
*/
@Deprecated
public Writable call(Writable param, long receiveTime) throws IOException {
- return call(null, param, receiveTime);
+ return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
}
/** Called for each call. */
- public abstract Writable call(String protocol,
- Writable param, long receiveTime)
- throws IOException;
+ public abstract Writable call(RpcKind rpcKind, String protocol,
+ Writable param, long receiveTime) throws IOException;
/**
* Authorize the incoming client connection.
@@ -1957,5 +2039,5 @@ public abstract class Server {
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
- }
+ }
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Sun Dec 4 20:44:36 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.ipc;
-import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
@@ -27,18 +26,14 @@ import java.lang.reflect.InvocationTarge
import java.net.InetSocketAddress;
import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.io.Closeable;
-import java.util.Map;
-import java.util.HashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
@@ -53,36 +48,9 @@ import org.apache.hadoop.conf.*;
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
-
- /**
- * Get all superInterfaces that extend VersionedProtocol
- * @param childInterfaces
- * @return the super interfaces that extend VersionedProtocol
- */
- private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
- List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
-
- for (Class<?> childInterface : childInterfaces) {
- if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
- allInterfaces.add(childInterface);
- allInterfaces.addAll(
- Arrays.asList(
- getSuperInterfaces(childInterface.getInterfaces())));
- } else {
- LOG.warn("Interface " + childInterface +
- " ignored because it does not extend VersionedProtocol");
- }
- }
- return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
- }
-
- /**
- * Get all interfaces that the given protocol implements or extends
- * which are assignable from VersionedProtocol.
- */
- private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
- Class<?>[] interfaces = protocol.getInterfaces();
- return getSuperInterfaces(interfaces);
+ static { // Register the rpcRequest deserializer for WritableRpcEngine
+ org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+ Invocation.class, new Server.WritableRpcInvoker());
}
@@ -120,15 +88,7 @@ public class WritableRpcEngine implement
clientVersion = 0;
clientMethodsHash = 0;
} else {
- try {
- Field versionField = method.getDeclaringClass().getField("versionID");
- versionField.setAccessible(true);
- this.clientVersion = versionField.getLong(method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
+ this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
}
@@ -329,140 +289,25 @@ public class WritableRpcEngine implement
/** An RPC Server. */
public static class Server extends RPC.Server {
- private boolean verbose;
-
- /**
- * The key in Map
- */
- static class ProtoNameVer {
- final String protocol;
- final long version;
- ProtoNameVer(String protocol, long ver) {
- this.protocol = protocol;
- this.version = ver;
- }
- @Override
- public boolean equals(Object o) {
- if (o == null)
- return false;
- if (this == o)
- return true;
- if (! (o instanceof ProtoNameVer))
- return false;
- ProtoNameVer pv = (ProtoNameVer) o;
- return ((pv.protocol.equals(this.protocol)) &&
- (pv.version == this.version));
- }
- @Override
- public int hashCode() {
- return protocol.hashCode() * 37 + (int) version;
- }
- }
-
- /**
- * The value in map
- */
- static class ProtoClassProtoImpl {
- final Class<?> protocolClass;
- final Object protocolImpl;
- ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
- this.protocolClass = protocolClass;
- this.protocolImpl = protocolImpl;
- }
- }
-
- private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap =
- new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
-
- // Register protocol and its impl for rpc calls
- private void registerProtocolAndImpl(Class<?> protocolClass,
- Object protocolImpl) throws IOException {
- String protocolName = RPC.getProtocolName(protocolClass);
- VersionedProtocol vp = (VersionedProtocol) protocolImpl;
- long version;
- try {
- version = vp.getProtocolVersion(protocolName, 0);
- } catch (Exception ex) {
- LOG.warn("Protocol " + protocolClass +
- " NOT registered as getProtocolVersion throws exception ");
- return;
- }
- protocolImplMap.put(new ProtoNameVer(protocolName, version),
- new ProtoClassProtoImpl(protocolClass, protocolImpl));
- LOG.debug("Protocol Name = " + protocolName + " version=" + version +
- " ProtocolImpl=" + protocolImpl.getClass().getName() +
- " protocolClass=" + protocolClass.getName());
- }
-
- private static class VerProtocolImpl {
- final long version;
- final ProtoClassProtoImpl protocolTarget;
- VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
- this.version = ver;
- this.protocolTarget = protocolTarget;
- }
- }
-
-
- @SuppressWarnings("unused") // will be useful later.
- private VerProtocolImpl[] getSupportedProtocolVersions(
- String protocolName) {
- VerProtocolImpl[] resultk = new VerProtocolImpl[protocolImplMap.size()];
- int i = 0;
- for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
- protocolImplMap.entrySet()) {
- if (pv.getKey().protocol.equals(protocolName)) {
- resultk[i++] =
- new VerProtocolImpl(pv.getKey().version, pv.getValue());
- }
- }
- if (i == 0) {
- return null;
- }
- VerProtocolImpl[] result = new VerProtocolImpl[i];
- System.arraycopy(resultk, 0, result, 0, i);
- return result;
- }
-
- private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {
- Long highestVersion = 0L;
- ProtoClassProtoImpl highest = null;
- for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
- .entrySet()) {
- if (pv.getKey().protocol.equals(protocolName)) {
- if ((highest == null) || (pv.getKey().version > highestVersion)) {
- highest = pv.getValue();
- highestVersion = pv.getKey().version;
- }
- }
- }
- if (highest == null) {
- return null;
- }
- return new VerProtocolImpl(highestVersion, highest);
- }
-
-
- /** Construct an RPC server.
+ /**
+ * Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
*
- * @deprecated Use #Server(Class, Object, Configuration, String, int)
- *
+ * @deprecated Use #Server(Class, Object, Configuration, String, int)
*/
@Deprecated
public Server(Object instance, Configuration conf, String bindAddress,
- int port)
- throws IOException {
+ int port) throws IOException {
this(null, instance, conf, bindAddress, port);
}
/** Construct an RPC server.
- * @param protocol class
- * @param instance the instance whose methods will be called
+ * @param protocolClass class
+ * @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
@@ -474,16 +319,8 @@ public class WritableRpcEngine implement
false, null);
}
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length-1];
- }
-
-
- /** Construct an RPC server.
+ /**
+ * Construct an RPC server.
* @param protocolImpl the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
@@ -505,7 +342,8 @@ public class WritableRpcEngine implement
}
- /** Construct an RPC server.
+ /**
+ * Construct an RPC server.
* @param protocolClass - the protocol being registered
* can be null for compatibility with old usage (see below for details)
* @param protocolImpl the protocol impl that will be called
@@ -520,7 +358,7 @@ public class WritableRpcEngine implement
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+ super(bindAddress, port, null, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(protocolImpl.getClass().getName()), secretManager);
@@ -535,7 +373,7 @@ public class WritableRpcEngine implement
* the protocolImpl is derived from the protocolClass(es)
* we register all interfaces extended by the protocolImpl
*/
- protocols = getProtocolInterfaces(protocolImpl.getClass());
+ protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
} else {
if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
@@ -544,132 +382,125 @@ public class WritableRpcEngine implement
protocolImpl.getClass());
}
// register protocol class and its super interfaces
- registerProtocolAndImpl(protocolClass, protocolImpl);
- protocols = getProtocolInterfaces(protocolClass);
+ registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+ protocols = RPC.getProtocolInterfaces(protocolClass);
}
for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) {
- registerProtocolAndImpl(p, protocolImpl);
+ registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
}
}
}
-
- @Override
- public <PROTO, IMPL extends PROTO> Server
- addProtocol(
- Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
- registerProtocolAndImpl(protocolClass, protocolImpl);
- return this;
+ private static void log(String value) {
+ if (value!= null && value.length() > 55)
+ value = value.substring(0, 55)+"...";
+ LOG.info(value);
}
- /**
- * Process a client call
- * @param protocolName - the protocol name (the class of the client proxy
- * used to make calls to the rpc server.
- * @param param parameters
- * @param receivedTime time at which the call receoved (for metrics)
- * @return the call's return
- * @throws IOException
- */
- public Writable call(String protocolName, Writable param, long receivedTime)
- throws IOException {
- try {
- Invocation call = (Invocation)param;
- if (verbose) log("Call: " + call);
-
- // Verify rpc version
- if (call.getRpcVersion() != writableRpcVersion) {
- // Client is using a different version of WritableRpc
- throw new IOException(
- "WritableRpc version mismatch, client side version="
- + call.getRpcVersion() + ", server side version="
- + writableRpcVersion);
- }
+ static class WritableRpcInvoker implements RpcInvoker {
+
+ @Override
+ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+ String protocolName, Writable rpcRequest, long receivedTime)
+ throws IOException {
+ try {
+ Invocation call = (Invocation)rpcRequest;
+ if (server.verbose) log("Call: " + call);
+
+ // Verify rpc version
+ if (call.getRpcVersion() != writableRpcVersion) {
+ // Client is using a different version of WritableRpc
+ throw new IOException(
+ "WritableRpc version mismatch, client side version="
+ + call.getRpcVersion() + ", server side version="
+ + writableRpcVersion);
+ }
- long clientVersion = call.getProtocolVersion();
- final String protoName;
- ProtoClassProtoImpl protocolImpl;
- if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
- // VersionProtocol methods are often used by client to figure out
- // which version of protocol to use.
- //
- // Versioned protocol methods should go the protocolName protocol
- // rather than the declaring class of the method since the
- // the declaring class is VersionedProtocol which is not
- // registered directly.
- // Send the call to the highest protocol version
- protocolImpl =
- getHighestSupportedProtocol(protocolName).protocolTarget;
- } else {
- protoName = call.declaringClassProtocolName;
-
- // Find the right impl for the protocol based on client version.
- ProtoNameVer pv =
- new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
- protocolImpl = protocolImplMap.get(pv);
- if (protocolImpl == null) { // no match for Protocol AND Version
- VerProtocolImpl highest =
- getHighestSupportedProtocol(protoName);
+ long clientVersion = call.getProtocolVersion();
+ final String protoName;
+ ProtoClassProtoImpl protocolImpl;
+ if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+ // VersionProtocol methods are often used by client to figure out
+ // which version of protocol to use.
+ //
+ // Versioned protocol methods should go the protocolName protocol
+ // rather than the declaring class of the method since the
+ // the declaring class is VersionedProtocol which is not
+ // registered directly.
+ // Send the call to the highest protocol version
+ VerProtocolImpl highest = server.getHighestSupportedProtocol(
+ RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) {
- throw new IOException("Unknown protocol: " + protoName);
- } else { // protocol supported but not the version that client wants
- throw new RPC.VersionMismatch(protoName, clientVersion,
- highest.version);
+ throw new IOException("Unknown protocol: " + protocolName);
+ }
+ protocolImpl = highest.protocolTarget;
+ } else {
+ protoName = call.declaringClassProtocolName;
+
+ // Find the right impl for the protocol based on client version.
+ ProtoNameVer pv =
+ new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+ protocolImpl =
+ server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE,
+ protoName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protoName);
+ } else { // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
}
}
- }
-
+
- // Invoke the protocol method
+ // Invoke the protocol method
- long startTime = System.currentTimeMillis();
- Method method =
- protocolImpl.protocolClass.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
- rpcDetailedMetrics.init(protocolImpl.protocolClass);
- Object value =
- method.invoke(protocolImpl.protocolImpl, call.getParameters());
- int processingTime = (int) (System.currentTimeMillis() - startTime);
- int qTime = (int) (startTime-receivedTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Served: " + call.getMethodName() +
- " queueTime= " + qTime +
- " procesingTime= " + processingTime);
- }
- rpcMetrics.addRpcQueueTime(qTime);
- rpcMetrics.addRpcProcessingTime(processingTime);
- rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
- processingTime);
- if (verbose) log("Return: "+value);
-
- return new ObjectWritable(method.getReturnType(), value);
-
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof IOException) {
- throw (IOException)target;
- } else {
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
+ long startTime = System.currentTimeMillis();
+ Method method =
+ protocolImpl.protocolClass.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+ method.setAccessible(true);
+ server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+ Object value =
+ method.invoke(protocolImpl.protocolImpl, call.getParameters());
+ int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int qTime = (int) (startTime-receivedTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Served: " + call.getMethodName() +
+ " queueTime= " + qTime +
+ " procesingTime= " + processingTime);
+ }
+ server.rpcMetrics.addRpcQueueTime(qTime);
+ server.rpcMetrics.addRpcProcessingTime(processingTime);
+ server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+ processingTime);
+ if (server.verbose) log("Return: "+value);
+
+ return new ObjectWritable(method.getReturnType(), value);
+
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ throw (IOException)target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ }
+ } catch (Throwable e) {
+ if (!(e instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", e);
+ }
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
throw ioe;
}
- } catch (Throwable e) {
- if (!(e instanceof IOException)) {
- LOG.error("Unexpected throwable object ", e);
- }
- IOException ioe = new IOException(e.toString());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
}
}
}
-
- private static void log(String value) {
- if (value!= null && value.length() > 55)
- value = value.substring(0, 55)+"...";
- LOG.info(value);
- }
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java Sun Dec 4 20:44:36 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: hadoop_rpc.proto
@@ -18,6 +35,14 @@ public final class HadoopRpcProtos {
// optional bytes request = 2;
boolean hasRequest();
com.google.protobuf.ByteString getRequest();
+
+ // required string declaringClassProtocolName = 3;
+ boolean hasDeclaringClassProtocolName();
+ String getDeclaringClassProtocolName();
+
+ // required uint64 clientProtocolVersion = 4;
+ boolean hasClientProtocolVersion();
+ long getClientProtocolVersion();
}
public static final class HadoopRpcRequestProto extends
com.google.protobuf.GeneratedMessage
@@ -90,9 +115,53 @@ public final class HadoopRpcProtos {
return request_;
}
+ // required string declaringClassProtocolName = 3;
+ public static final int DECLARINGCLASSPROTOCOLNAME_FIELD_NUMBER = 3;
+ private java.lang.Object declaringClassProtocolName_;
+ public boolean hasDeclaringClassProtocolName() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public String getDeclaringClassProtocolName() {
+ java.lang.Object ref = declaringClassProtocolName_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ declaringClassProtocolName_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getDeclaringClassProtocolNameBytes() {
+ java.lang.Object ref = declaringClassProtocolName_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ declaringClassProtocolName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ // required uint64 clientProtocolVersion = 4;
+ public static final int CLIENTPROTOCOLVERSION_FIELD_NUMBER = 4;
+ private long clientProtocolVersion_;
+ public boolean hasClientProtocolVersion() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public long getClientProtocolVersion() {
+ return clientProtocolVersion_;
+ }
+
private void initFields() {
methodName_ = "";
request_ = com.google.protobuf.ByteString.EMPTY;
+ declaringClassProtocolName_ = "";
+ clientProtocolVersion_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -103,6 +172,14 @@ public final class HadoopRpcProtos {
memoizedIsInitialized = 0;
return false;
}
+ if (!hasDeclaringClassProtocolName()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasClientProtocolVersion()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -116,6 +193,12 @@ public final class HadoopRpcProtos {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, request_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBytes(3, getDeclaringClassProtocolNameBytes());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeUInt64(4, clientProtocolVersion_);
+ }
getUnknownFields().writeTo(output);
}
@@ -133,6 +216,14 @@ public final class HadoopRpcProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, request_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(3, getDeclaringClassProtocolNameBytes());
+ }
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(4, clientProtocolVersion_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -166,6 +257,16 @@ public final class HadoopRpcProtos {
result = result && getRequest()
.equals(other.getRequest());
}
+ result = result && (hasDeclaringClassProtocolName() == other.hasDeclaringClassProtocolName());
+ if (hasDeclaringClassProtocolName()) {
+ result = result && getDeclaringClassProtocolName()
+ .equals(other.getDeclaringClassProtocolName());
+ }
+ result = result && (hasClientProtocolVersion() == other.hasClientProtocolVersion());
+ if (hasClientProtocolVersion()) {
+ result = result && (getClientProtocolVersion()
+ == other.getClientProtocolVersion());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -183,6 +284,14 @@ public final class HadoopRpcProtos {
hash = (37 * hash) + REQUEST_FIELD_NUMBER;
hash = (53 * hash) + getRequest().hashCode();
}
+ if (hasDeclaringClassProtocolName()) {
+ hash = (37 * hash) + DECLARINGCLASSPROTOCOLNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getDeclaringClassProtocolName().hashCode();
+ }
+ if (hasClientProtocolVersion()) {
+ hash = (37 * hash) + CLIENTPROTOCOLVERSION_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getClientProtocolVersion());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -303,6 +412,10 @@ public final class HadoopRpcProtos {
bitField0_ = (bitField0_ & ~0x00000001);
request_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
+ declaringClassProtocolName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000004);
+ clientProtocolVersion_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -349,6 +462,14 @@ public final class HadoopRpcProtos {
to_bitField0_ |= 0x00000002;
}
result.request_ = request_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.declaringClassProtocolName_ = declaringClassProtocolName_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.clientProtocolVersion_ = clientProtocolVersion_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -371,6 +492,12 @@ public final class HadoopRpcProtos {
if (other.hasRequest()) {
setRequest(other.getRequest());
}
+ if (other.hasDeclaringClassProtocolName()) {
+ setDeclaringClassProtocolName(other.getDeclaringClassProtocolName());
+ }
+ if (other.hasClientProtocolVersion()) {
+ setClientProtocolVersion(other.getClientProtocolVersion());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -380,6 +507,14 @@ public final class HadoopRpcProtos {
return false;
}
+ if (!hasDeclaringClassProtocolName()) {
+
+ return false;
+ }
+ if (!hasClientProtocolVersion()) {
+
+ return false;
+ }
return true;
}
@@ -416,6 +551,16 @@ public final class HadoopRpcProtos {
request_ = input.readBytes();
break;
}
+ case 26: {
+ bitField0_ |= 0x00000004;
+ declaringClassProtocolName_ = input.readBytes();
+ break;
+ }
+ case 32: {
+ bitField0_ |= 0x00000008;
+ clientProtocolVersion_ = input.readUInt64();
+ break;
+ }
}
}
}
@@ -482,6 +627,63 @@ public final class HadoopRpcProtos {
return this;
}
+ // required string declaringClassProtocolName = 3;
+ private java.lang.Object declaringClassProtocolName_ = "";
+ public boolean hasDeclaringClassProtocolName() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public String getDeclaringClassProtocolName() {
+ java.lang.Object ref = declaringClassProtocolName_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ declaringClassProtocolName_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setDeclaringClassProtocolName(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000004;
+ declaringClassProtocolName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearDeclaringClassProtocolName() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ declaringClassProtocolName_ = getDefaultInstance().getDeclaringClassProtocolName();
+ onChanged();
+ return this;
+ }
+ void setDeclaringClassProtocolName(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000004;
+ declaringClassProtocolName_ = value;
+ onChanged();
+ }
+
+ // required uint64 clientProtocolVersion = 4;
+ private long clientProtocolVersion_ ;
+ public boolean hasClientProtocolVersion() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public long getClientProtocolVersion() {
+ return clientProtocolVersion_;
+ }
+ public Builder setClientProtocolVersion(long value) {
+ bitField0_ |= 0x00000008;
+ clientProtocolVersion_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearClientProtocolVersion() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ clientProtocolVersion_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:HadoopRpcRequestProto)
}
@@ -1706,16 +1908,18 @@ public final class HadoopRpcProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\020hadoop_rpc.proto\"<\n\025HadoopRpcRequestPr" +
+ "\n\020hadoop_rpc.proto\"\177\n\025HadoopRpcRequestPr" +
"oto\022\022\n\nmethodName\030\001 \002(\t\022\017\n\007request\030\002 \001(\014" +
- "\"D\n\027HadoopRpcExceptionProto\022\025\n\rexception" +
- "Name\030\001 \001(\t\022\022\n\nstackTrace\030\002 \001(\t\"\272\001\n\026Hadoo" +
- "pRpcResponseProto\0226\n\006status\030\001 \002(\0162&.Hado" +
- "opRpcResponseProto.ResponseStatus\022\020\n\010res" +
- "ponse\030\002 \001(\014\022+\n\texception\030\003 \001(\0132\030.HadoopR" +
- "pcExceptionProto\")\n\016ResponseStatus\022\013\n\007SU" +
- "CCESS\020\001\022\n\n\006ERRROR\020\002B4\n\036org.apache.hadoop" +
- ".ipc.protobufB\017HadoopRpcProtos\240\001\001"
+ "\022\"\n\032declaringClassProtocolName\030\003 \002(\t\022\035\n\025" +
+ "clientProtocolVersion\030\004 \002(\004\"D\n\027HadoopRpc" +
+ "ExceptionProto\022\025\n\rexceptionName\030\001 \001(\t\022\022\n" +
+ "\nstackTrace\030\002 \001(\t\"\272\001\n\026HadoopRpcResponseP" +
+ "roto\0226\n\006status\030\001 \002(\0162&.HadoopRpcResponse" +
+ "Proto.ResponseStatus\022\020\n\010response\030\002 \001(\014\022+" +
+ "\n\texception\030\003 \001(\0132\030.HadoopRpcExceptionPr" +
+ "oto\")\n\016ResponseStatus\022\013\n\007SUCCESS\020\001\022\n\n\006ER",
+ "RROR\020\002B4\n\036org.apache.hadoop.ipc.protobuf" +
+ "B\017HadoopRpcProtos\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1727,7 +1931,7 @@ public final class HadoopRpcProtos {
internal_static_HadoopRpcRequestProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_HadoopRpcRequestProto_descriptor,
- new java.lang.String[] { "MethodName", "Request", },
+ new java.lang.String[] { "MethodName", "Request", "DeclaringClassProtocolName", "ClientProtocolVersion", },
org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.class,
org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.Builder.class);
internal_static_HadoopRpcExceptionProto_descriptor =
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto Sun Dec 4 20:44:36 2011
@@ -34,6 +34,12 @@ message HadoopRpcRequestProto {
/** Bytes corresponding to the client protobuf request */
optional bytes request = 2;
+
+ /** protocol name of class declaring the called method */
+ required string declaringClassProtocolName = 3;
+
+ /** protocol version of class declaring the called method */
+ required uint64 clientProtocolVersion = 4;
}
/**
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java Sun Dec 4 20:44:36 2011
@@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo;
import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
@@ -101,7 +102,8 @@ public class TestAvroRpc extends TestCas
RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
ADDRESS, 0, 5, true, conf, sm);
- server.addProtocol(AvroTestProtocol.class, new TestImpl());
+ server.addProtocol(RpcKind.RPC_WRITABLE,
+ AvroTestProtocol.class, new TestImpl());
try {
server.start();
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sun Dec 4 20:44:36 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils;
@@ -96,8 +97,8 @@ public class TestIPC {
}
@Override
- public Writable call(String protocol, Writable param, long receiveTime)
- throws IOException {
+ public Writable call(RpcKind rpcKind, String protocol, Writable param,
+ long receiveTime) throws IOException {
if (sleep) {
// sleep a bit
try {