You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2011/09/03 02:31:06 UTC
svn commit: r1164771 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Author: sradia
Date: Sat Sep 3 00:31:05 2011
New Revision: 1164771
URL: http://svn.apache.org/viewvc?rev=1164771&view=rev
Log:
HADOOP-7524 and MapReduce-2887 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Sat Sep 3 00:31:05 2011
@@ -5,6 +5,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm)
+ HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
Release 0.23.0 - Unreleased
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Sat Sep 3 00:31:05 2011
@@ -285,8 +285,8 @@ public class Client {
authMethod = AuthMethod.KERBEROS;
}
- header = new ConnectionHeader(protocol == null ? null : protocol
- .getName(), ticket, authMethod);
+ header =
+ new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol "
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1164771&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Sat Sep 3 00:31:05 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+
+/**
+ * The protocol name that is used when a client and server connect.
+ * By default the class name of the protocol interface is the protocol name.
+ *
+ * Why override the default name (i.e. the class name)?
+ * One use case overriding the default name (i.e. the class name) is when
+ * there are multiple implementations of the same protocol, each with say a
+ * different version/serialization.
+ * In Hadoop this is used to allow multiple server and client adapters
+ * for different versions of the same protocol service.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ProtocolInfo {
+ String protocolName(); // the name of the protocol (i.e. rpc service)
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sat Sep 3 00:31:05 2011
@@ -62,6 +62,20 @@ import org.apache.hadoop.util.Reflection
*/
public class RPC {
static final Log LOG = LogFactory.getLog(RPC.class);
+
+
+ /**
+ * Get the protocol name.
+ * If the protocol class has a ProtocolAnnotation, then get the protocol
+ * name from the annotation; otherwise the class name is the protocol name.
+ */
+ static public String getProtocolName(Class<?> protocol) {
+ if (protocol == null) {
+ return null;
+ }
+ ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+ return (anno == null) ? protocol.getName() : anno.protocolName();
+ }
private RPC() {} // no public ctor
@@ -553,8 +567,10 @@ public class RPC {
}
/** Construct a server for a protocol implementation instance. */
- public static Server getServer(Class<?> protocol,
- Object instance, String bindAddress, int port,
+
+ public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
+ Server getServer(Class<PROTO> protocol,
+ IMPL instance, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
@@ -576,6 +592,18 @@ public class RPC {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager);
}
+
+ /**
+ * Add a protocol to the existing server.
+ * @param protocolClass - the protocol class
+ * @param protocolImpl - the impl of the protocol that will be called
+ * @return the server (for convenience)
+ */
+ public <PROTO extends VersionedProtocol, IMPL extends PROTO>
+ Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
+ ) throws IOException {
+ throw new IOException("addProtocol Not Implemented");
+ }
}
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Sat Sep 3 00:31:05 2011
@@ -900,7 +900,7 @@ public abstract class Server {
private InetAddress addr;
ConnectionHeader header = new ConnectionHeader();
- Class<?> protocol;
+ String protocolName;
boolean useSasl;
SaslServer saslServer;
private AuthMethod authMethod;
@@ -1287,15 +1287,8 @@ public abstract class Server {
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(buf));
header.readFields(in);
- try {
- String protocolClassName = header.getProtocol();
- if (protocolClassName != null) {
- protocol = getProtocolClass(header.getProtocol(), conf);
- rpcDetailedMetrics.init(protocol);
- }
- } catch (ClassNotFoundException cnfe) {
- throw new IOException("Unknown protocol: " + header.getProtocol());
- }
+ protocolName = header.getProtocol();
+
UserGroupInformation protocolUser = header.getUgi();
if (!useSasl) {
@@ -1484,7 +1477,7 @@ public abstract class Server {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
- value = call(call.connection.protocol, call.param,
+ value = call(call.connection.protocolName, call.param,
call.timestamp);
} else {
value =
@@ -1493,7 +1486,7 @@ public abstract class Server {
@Override
public Writable run() throws Exception {
// make the call
- return call(call.connection.protocol,
+ return call(call.connection.protocolName,
call.param, call.timestamp);
}
@@ -1753,7 +1746,7 @@ public abstract class Server {
/**
* Called for each call.
- * @deprecated Use {@link #call(Class, Writable, long)} instead
+ * @deprecated Use {@link #call(String, Writable, long)} instead
*/
@Deprecated
public Writable call(Writable param, long receiveTime) throws IOException {
@@ -1761,7 +1754,7 @@ public abstract class Server {
}
/** Called for each call. */
- public abstract Writable call(Class<?> protocol,
+ public abstract Writable call(String protocol,
Writable param, long receiveTime)
throws IOException;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java Sat Sep 3 00:31:05 2011
@@ -34,7 +34,6 @@ public interface VersionedProtocol {
* @return the version that the server will speak
* @throws IOException if any IO error occurs
*/
- @Deprecated
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Sat Sep 3 00:31:05 2011
@@ -27,6 +27,9 @@ import java.lang.reflect.InvocationTarge
import java.net.InetSocketAddress;
import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -35,6 +38,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -47,10 +51,46 @@ import org.apache.hadoop.conf.*;
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
+
+ /**
+ * Get all superInterfaces that extend VersionedProtocol
+ * @param childInterfaces
+ * @return the super interfaces that extend VersionedProtocol
+ */
+ private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+ List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+ for (Class<?> childInterface : childInterfaces) {
+ if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+ allInterfaces.add(childInterface);
+ allInterfaces.addAll(
+ Arrays.asList(
+ getSuperInterfaces(childInterface.getInterfaces())));
+ } else {
+ LOG.warn("Interface " + childInterface +
+ " ignored because it does not extend VersionedProtocol");
+ }
+ }
+ return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
+ }
+
+ /**
+ * Get all interfaces that the given protocol implements or extends
+ * which are assignable from VersionedProtocol.
+ */
+ private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+ Class<?>[] interfaces = protocol.getInterfaces();
+ return getSuperInterfaces(interfaces);
+ }
+
+
//writableRpcVersion should be updated if there is a change
//in format of the rpc messages.
- public static long writableRpcVersion = 1L;
+
+ // 2L - added declared class to Invocation
+ public static final long writableRpcVersion = 2L;
+
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
private String methodName;
@@ -59,11 +99,13 @@ public class WritableRpcEngine implement
private Configuration conf;
private long clientVersion;
private int clientMethodsHash;
+ private String declaringClassProtocolName;
//This could be different from static writableRpcVersion when received
//at server, if client is using a different version.
private long rpcVersion;
+ @SuppressWarnings("unused") // called when deserializing an invocation
public Invocation() {}
public Invocation(Method method, Object[] parameters) {
@@ -88,6 +130,8 @@ public class WritableRpcEngine implement
this.clientMethodsHash = ProtocolSignature.getFingerprint(method
.getDeclaringClass().getMethods());
}
+ this.declaringClassProtocolName =
+ RPC.getProtocolName(method.getDeclaringClass());
}
/** The name of the method invoked. */
@@ -103,6 +147,7 @@ public class WritableRpcEngine implement
return clientVersion;
}
+ @SuppressWarnings("unused")
private int getClientMethodsHash() {
return clientMethodsHash;
}
@@ -115,8 +160,10 @@ public class WritableRpcEngine implement
return rpcVersion;
}
+ @SuppressWarnings("deprecation")
public void readFields(DataInput in) throws IOException {
rpcVersion = in.readLong();
+ declaringClassProtocolName = UTF8.readString(in);
methodName = UTF8.readString(in);
clientVersion = in.readLong();
clientMethodsHash = in.readInt();
@@ -124,13 +171,16 @@ public class WritableRpcEngine implement
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
for (int i = 0; i < parameters.length; i++) {
- parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+ parameters[i] =
+ ObjectWritable.readObject(in, objectWritable, this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
+ @SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
+ UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName);
out.writeLong(clientVersion);
out.writeInt(clientMethodsHash);
@@ -273,30 +323,161 @@ public class WritableRpcEngine implement
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
- public Server getServer(Class<?> protocol,
- Object instance, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler,
- boolean verbose, Configuration conf,
+ public RPC.Server getServer(Class<?> protocolClass,
+ Object protocolImpl, String bindAddress, int port,
+ int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
- return new Server(instance, conf, bindAddress, port, numHandlers,
- numReaders, queueSizePerHandler, verbose, secretManager);
+ return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
}
+
/** An RPC Server. */
public static class Server extends RPC.Server {
- private Object instance;
private boolean verbose;
+
+ /**
+ * The key in Map
+ */
+ static class ProtoNameVer {
+ final String protocol;
+ final long version;
+ ProtoNameVer(String protocol, long ver) {
+ this.protocol = protocol;
+ this.version = ver;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ if (this == o)
+ return true;
+ if (! (o instanceof ProtoNameVer))
+ return false;
+ ProtoNameVer pv = (ProtoNameVer) o;
+ return ((pv.protocol.equals(this.protocol)) &&
+ (pv.version == this.version));
+ }
+ @Override
+ public int hashCode() {
+ return protocol.hashCode() * 37 + (int) version;
+ }
+ }
+
+ /**
+ * The value in map
+ */
+ static class ProtoClassProtoImpl {
+ final Class<?> protocolClass;
+ final Object protocolImpl;
+ ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+ this.protocolClass = protocolClass;
+ this.protocolImpl = protocolImpl;
+ }
+ }
+
+ private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap =
+ new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
+
+ // Register protocol and its impl for rpc calls
+ private void registerProtocolAndImpl(Class<?> protocolClass,
+ Object protocolImpl) throws IOException {
+ String protocolName = RPC.getProtocolName(protocolClass);
+ VersionedProtocol vp = (VersionedProtocol) protocolImpl;
+ long version;
+ try {
+ version = vp.getProtocolVersion(protocolName, 0);
+ } catch (Exception ex) {
+ LOG.warn("Protocol " + protocolClass +
+ " NOT registered as getProtocolVersion throws exception ");
+ return;
+ }
+ protocolImplMap.put(new ProtoNameVer(protocolName, version),
+ new ProtoClassProtoImpl(protocolClass, protocolImpl));
+ LOG.info("ProtocolImpl=" + protocolImpl.getClass().getName() +
+ " protocolClass=" + protocolClass.getName() + " version=" + version);
+ }
+
+ private static class VerProtocolImpl {
+ final long version;
+ final ProtoClassProtoImpl protocolTarget;
+ VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+ this.version = ver;
+ this.protocolTarget = protocolTarget;
+ }
+ }
+
+
+ @SuppressWarnings("unused") // will be useful later.
+ private VerProtocolImpl[] getSupportedProtocolVersions(
+ String protocolName) {
+ VerProtocolImpl[] resultk = new VerProtocolImpl[protocolImplMap.size()];
+ int i = 0;
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+ protocolImplMap.entrySet()) {
+ if (pv.getKey().protocol.equals(protocolName)) {
+ resultk[i++] =
+ new VerProtocolImpl(pv.getKey().version, pv.getValue());
+ }
+ }
+ if (i == 0) {
+ return null;
+ }
+ VerProtocolImpl[] result = new VerProtocolImpl[i];
+ System.arraycopy(resultk, 0, result, 0, i);
+ return result;
+ }
+
+ private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {
+ Long highestVersion = 0L;
+ ProtoClassProtoImpl highest = null;
+ for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
+ .entrySet()) {
+ if (pv.getKey().protocol.equals(protocolName)) {
+ if ((highest == null) || (pv.getKey().version > highestVersion)) {
+ highest = pv.getValue();
+ highestVersion = pv.getKey().version;
+ }
+ }
+ }
+ if (highest == null) {
+ return null;
+ }
+ return new VerProtocolImpl(highestVersion, highest);
+ }
+
/** Construct an RPC server.
* @param instance the instance whose methods will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
+ *
+ * @deprecated Use #Server(Class, Object, Configuration, String, int)
+ *
+ */
+ @Deprecated
+ public Server(Object instance, Configuration conf, String bindAddress,
+ int port)
+ throws IOException {
+ this(null, instance, conf, bindAddress, port);
+ }
+
+
+ /** Construct an RPC server.
+ * @param protocol class
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
*/
- public Server(Object instance, Configuration conf, String bindAddress, int port)
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port)
throws IOException {
- this(instance, conf, bindAddress, port, 1, -1, -1, false, null);
+ this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1,
+ false, null);
}
private static String classNameBase(String className) {
@@ -307,35 +488,103 @@ public class WritableRpcEngine implement
return names[names.length-1];
}
+
/** Construct an RPC server.
- * @param instance the instance whose methods will be called
+ * @param protocolImpl the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ *
+ * @deprecated use Server#Server(Class, Object,
+ * Configuration, String, int, int, int, int, boolean, SecretManager)
+ */
+ @Deprecated
+ public Server(Object protocolImpl, Configuration conf, String bindAddress,
+ int port, int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ this(null, protocolImpl, conf, bindAddress, port,
+ numHandlers, numReaders, queueSizePerHandler, verbose,
+ secretManager);
+
+ }
+
+ /** Construct an RPC server.
+ * @param protocolClass - the protocol being registered
+ * can be null for compatibility with old usage (see below for details)
+ * @param protocolImpl the protocol impl that will be called
* @param conf the configuration to use
* @param bindAddress the address to bind on to listen for connection
* @param port the port to listen for connections on
* @param numHandlers the number of method handler threads to run
* @param verbose whether each call should be logged
*/
- public Server(Object instance, Configuration conf, String bindAddress, int port,
- int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose,
- SecretManager<? extends TokenIdentifier> secretManager)
+ public Server(Class<?> protocolClass, Object protocolImpl,
+ Configuration conf, String bindAddress, int port,
+ int numHandlers, int numReaders, int queueSizePerHandler,
+ boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, numReaders,
queueSizePerHandler, conf,
- classNameBase(instance.getClass().getName()), secretManager);
- this.instance = instance;
+ classNameBase(protocolImpl.getClass().getName()), secretManager);
+
this.verbose = verbose;
+
+
+ Class<?>[] protocols;
+ if (protocolClass == null) { // derive protocol from impl
+ /*
+ * In order to remain compatible with the old usage where a single
+ * target protocolImpl is suppled for all protocol interfaces, and
+ * the protocolImpl is derived from the protocolClass(es)
+ * we register all interfaces extended by the protocolImpl
+ */
+ protocols = getProtocolInterfaces(protocolImpl.getClass());
+
+ } else {
+ if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+ throw new IOException("protocolClass "+ protocolClass +
+ " is not implemented by protocolImpl which is of class " +
+ protocolImpl.getClass());
+ }
+ // register protocol class and its super interfaces
+ registerProtocolAndImpl(protocolClass, protocolImpl);
+ protocols = getProtocolInterfaces(protocolClass);
+ }
+ for (Class<?> p : protocols) {
+ if (!p.equals(VersionedProtocol.class)) {
+ registerProtocolAndImpl(p, protocolImpl);
+ }
+ }
+
}
- public Writable call(Class<?> protocol, Writable param, long receivedTime)
+
+ @Override
+ public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
+ addProtocol(
+ Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
+ registerProtocolAndImpl(protocolClass, protocolImpl);
+ return this;
+ }
+
+ /**
+ * Process a client call
+ * @param protocolName - the protocol name (the class of the client proxy
+ * used to make calls to the rpc server.
+ * @param param parameters
+ * @param receivedTime time at which the call receoved (for metrics)
+ * @return the call's return
+ * @throws IOException
+ */
+ public Writable call(String protocolName, Writable param, long receivedTime)
throws IOException {
try {
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
- Method method = protocol.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
-
// Verify rpc version
if (call.getRpcVersion() != writableRpcVersion) {
// Client is using a different version of WritableRpc
@@ -344,25 +593,51 @@ public class WritableRpcEngine implement
+ call.getRpcVersion() + ", server side version="
+ writableRpcVersion);
}
-
- //Verify protocol version.
- //Bypass the version check for VersionedProtocol
- if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
- long clientVersion = call.getProtocolVersion();
- ProtocolSignature serverInfo = ((VersionedProtocol) instance)
- .getProtocolSignature(protocol.getCanonicalName(), call
- .getProtocolVersion(), call.getClientMethodsHash());
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- LOG.warn("Version mismatch: client version=" + clientVersion
- + ", server version=" + serverVersion);
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
+
+ long clientVersion = call.getProtocolVersion();
+ final String protoName;
+ ProtoClassProtoImpl protocolImpl;
+ if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+ // VersionProtocol methods are often used by client to figure out
+ // which version of protocol to use.
+ //
+ // Versioned protocol methods should go the protocolName protocol
+ // rather than the declaring class of the method since the
+ // the declaring class is VersionedProtocol which is not
+ // registered directly.
+ // Send the call to the highest protocol version
+ protocolImpl =
+ getHighestSupportedProtocol(protocolName).protocolTarget;
+ } else {
+ protoName = call.declaringClassProtocolName;
+
+ // Find the right impl for the protocol based on client version.
+ ProtoNameVer pv =
+ new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+ protocolImpl = protocolImplMap.get(pv);
+ if (protocolImpl == null) { // no match for Protocol AND Version
+ VerProtocolImpl highest =
+ getHighestSupportedProtocol(protoName);
+ if (highest == null) {
+ throw new IOException("Unknown protocol: " + protoName);
+ } else { // protocol supported but not the version that client wants
+ throw new RPC.VersionMismatch(protoName, clientVersion,
+ highest.version);
+ }
}
}
+
+
+ // Invoke the protocol method
long startTime = System.currentTimeMillis();
- Object value = method.invoke(instance, call.getParameters());
+ Method method =
+ protocolImpl.protocolClass.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+ method.setAccessible(true);
+ rpcDetailedMetrics.init(protocolImpl.protocolClass);
+ Object value =
+ method.invoke(protocolImpl.protocolImpl, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sat Sep 3 00:31:05 2011
@@ -97,7 +97,7 @@ public class TestIPC {
}
@Override
- public Writable call(Class<?> protocol, Writable param, long receiveTime)
+ public Writable call(String protocol, Writable param, long receiveTime)
throws IOException {
if (sleep) {
// sleep a bit
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Sat Sep 3 00:31:05 2011
@@ -72,7 +72,7 @@ public class TestIPCServerResponder exte
}
@Override
- public Writable call(Class<?> protocol, Writable param, long receiveTime)
+ public Writable call(String protocol, Writable param, long receiveTime)
throws IOException {
if (sleep) {
try {
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java?rev=1164771&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java Sat Sep 3 00:31:05 2011
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestMultipleProtocolServer {
+ private static final String ADDRESS = "0.0.0.0";
+ private static InetSocketAddress addr;
+ private static RPC.Server server;
+
+ private static Configuration conf = new Configuration();
+
+
+ @ProtocolInfo(protocolName="Foo")
+ interface Foo0 extends VersionedProtocol {
+ public static final long versionID = 0L;
+ String ping() throws IOException;
+
+ }
+
+ @ProtocolInfo(protocolName="Foo")
+ interface Foo1 extends VersionedProtocol {
+ public static final long versionID = 1L;
+ String ping() throws IOException;
+ String ping2() throws IOException;
+ }
+
+ @ProtocolInfo(protocolName="Foo")
+ interface FooUnimplemented extends VersionedProtocol {
+ public static final long versionID = 2L;
+ String ping() throws IOException;
+ }
+
+ interface Mixin extends VersionedProtocol{
+ public static final long versionID = 0L;
+ void hello() throws IOException;
+ }
+ interface Bar extends Mixin, VersionedProtocol {
+ public static final long versionID = 0L;
+ int echo(int i) throws IOException;
+ }
+
+
+
+ class Foo0Impl implements Foo0 {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Foo0.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public String ping() {
+ return "Foo0";
+ }
+
+ }
+
+ class Foo1Impl implements Foo1 {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Foo1.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public String ping() {
+ return "Foo1";
+ }
+
+ @Override
+ public String ping2() {
+ return "Foo1";
+
+ }
+
+ }
+
+
+ class BarImpl implements Bar {
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return Bar.versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().
+ getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public int echo(int i) {
+ return i;
+ }
+
+ @Override
+ public void hello() {
+
+
+ }
+ }
+ @Before
+ public void setUp() throws Exception {
+ // create a server with two handlers
+ server = RPC.getServer(Foo0.class,
+ new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(Foo1.class, new Foo1Impl());
+ server.addProtocol(Bar.class, new BarImpl());
+ server.addProtocol(Mixin.class, new BarImpl());
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void test1() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
+
+ Foo0 foo0 = (Foo0)proxy.getProxy();
+ Assert.assertEquals("Foo0", foo0.ping());
+
+
+ proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
+
+
+ Foo1 foo1 = (Foo1)proxy.getProxy();
+ Assert.assertEquals("Foo1", foo1.ping());
+ Assert.assertEquals("Foo1", foo1.ping());
+
+
+ proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
+
+
+ Bar bar = (Bar)proxy.getProxy();
+ Assert.assertEquals(99, bar.echo(99));
+
+ // Now test Mixin class method
+
+ Mixin mixin = bar;
+ mixin.hello();
+ }
+
+
+ // Server does not implement the FooUnimplemented version of protocol Foo.
+ // See that calls to it fail.
+ @Test(expected=IOException.class)
+ public void testNonExistingProtocol() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(FooUnimplemented.class,
+ FooUnimplemented.versionID, addr, conf);
+
+ FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
+ foo.ping();
+ }
+
+
+ /**
+ * getProtocolVersion of an unimplemented version should return highest version
+ * Similarly getProtocolSignature should work.
+ * @throws IOException
+ */
+ @Test
+ public void testNonExistingProtocol2() throws IOException {
+ ProtocolProxy<?> proxy;
+ proxy = RPC.getProtocolProxy(FooUnimplemented.class,
+ FooUnimplemented.versionID, addr, conf);
+
+ FooUnimplemented foo = (FooUnimplemented)proxy.getProxy();
+ Assert.assertEquals(Foo1.versionID,
+ foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class),
+ FooUnimplemented.versionID));
+ foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class),
+ FooUnimplemented.versionID, 0);
+ }
+
+ @Test(expected=IOException.class)
+ public void testIncorrectServerCreation() throws IOException {
+ RPC.getServer(Foo1.class,
+ new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Sat Sep 3 00:31:05 2011
@@ -39,7 +39,7 @@ import org.junit.Test;
public class TestRPCCompatibility {
private static final String ADDRESS = "0.0.0.0";
private static InetSocketAddress addr;
- private static Server server;
+ private static RPC.Server server;
private ProtocolProxy<?> proxy;
public static final Log LOG =
@@ -52,10 +52,12 @@ public class TestRPCCompatibility {
void ping() throws IOException;
}
- public interface TestProtocol1 extends TestProtocol0 {
+ public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
String echo(String value) throws IOException;
}
+ @ProtocolInfo(protocolName=
+ "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
public interface TestProtocol2 extends TestProtocol1 {
int echo(int value) throws IOException;
}
@@ -89,11 +91,23 @@ public class TestRPCCompatibility {
public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
@Override
public String echo(String value) { return value; }
+ @Override
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ return TestProtocol1.versionID;
+ }
}
public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
@Override
public int echo(int value) { return value; }
+
+ @Override
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ return TestProtocol2.versionID;
+ }
+
}
@After
@@ -109,8 +123,10 @@ public class TestRPCCompatibility {
@Test // old client vs new server
public void testVersion0ClientVersion1Server() throws Exception {
// create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class,
- new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+ impl, ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -172,8 +188,10 @@ public class TestRPCCompatibility {
@Test // Compatible new client & old server
public void testVersion2ClientVersion1Server() throws Exception {
// create a server with two handlers
+ TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class,
- new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+ impl, ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -190,8 +208,10 @@ public class TestRPCCompatibility {
@Test // equal version client and server
public void testVersion2ClientVersion2Server() throws Exception {
// create a server with two handlers
+ TestImpl2 impl = new TestImpl2();
server = RPC.getServer(TestProtocol2.class,
- new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+ impl, ADDRESS, 0, 2, false, conf, null);
+ server.addProtocol(TestProtocol0.class, impl);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -250,14 +270,16 @@ public class TestRPCCompatibility {
assertEquals(hash1, hash2);
}
+ @ProtocolInfo(protocolName=
+ "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
public interface TestProtocol4 extends TestProtocol2 {
- public static final long versionID = 1L;
+ public static final long versionID = 4L;
int echo(int value) throws IOException;
}
@Test
public void testVersionMismatch() throws IOException {
- server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+ server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
@@ -268,7 +290,8 @@ public class TestRPCCompatibility {
proxy.echo(21);
fail("The call must throw VersionMismatch exception");
} catch (IOException ex) {
- Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+ Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(),
+ ex.getMessage().contains("VersionMismatch"));
}
}
}
\ No newline at end of file