You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2011/05/26 20:25:21 UTC
svn commit: r1128018 - in /hadoop/common/branches/yahoo-merge/src:
java/org/apache/hadoop/ipc/ test/core/org/apache/hadoop/ipc/
test/core/org/apache/hadoop/security/
Author: jitendra
Date: Thu May 26 18:25:20 2011
New Revision: 1128018
URL: http://svn.apache.org/viewvc?rev=1128018&view=rev
Log:
Merged r1099284 and r1064919 for HADOOP-6904 & HADOOP-7227 from trunk.
Added:
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolSignature.java
hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
Modified:
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RpcEngine.java
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java
hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPC.java
hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java
Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu May 26 18:25:20 2011
@@ -61,6 +61,8 @@ public class AvroRpcEngine implements Rp
/** Tunnel an Avro RPC request and response through Hadoop's RPC. */
private static interface TunnelProtocol extends VersionedProtocol {
+ //WritableRpcEngine expects a versionID in every protocol.
+ public static final long versionID = 0L;
/** All Avro methods and responses go through this. */
BufferListWritable call(BufferListWritable request) throws IOException;
}
@@ -107,10 +109,9 @@ public class AvroRpcEngine implements Rp
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
- this.tunnel =
- (TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION,
+ this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION,
addr, ticket, conf, factory,
- rpcTimeout);
+ rpcTimeout).getProxy();
this.remote = addr;
}
@@ -135,16 +136,20 @@ public class AvroRpcEngine implements Rp
}
/** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address. */
- public Object getProxy(Class<?> protocol, long clientVersion,
+ * talking to a server at the named address.
+ * @param <T>*/
+ @SuppressWarnings("unchecked")
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
- return Proxy.newProxyInstance
- (protocol.getClassLoader(),
- new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
+ return new ProtocolProxy<T>(protocol,
+ (T)Proxy.newProxyInstance(
+ protocol.getClassLoader(),
+ new Class[] { protocol },
+ new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)),
+ false);
}
/** Stop this proxy. */
@@ -191,11 +196,19 @@ public class AvroRpcEngine implements Rp
responder = createResponder(iface, impl);
}
+ @Override
public long getProtocolVersion(String protocol, long version)
- throws IOException {
+ throws IOException {
return VERSION;
}
+ @Override
+ public ProtocolSignature getProtocolSignature(
+ String protocol, long version, int clientMethodsHashCode)
+ throws IOException {
+ return new ProtocolSignature(VERSION, null);
+ }
+
public BufferListWritable call(final BufferListWritable request)
throws IOException {
return new BufferListWritable(responder.respond(request.buffers));
Added: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1128018&view=auto
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolProxy.java (added)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolProxy.java Thu May 26 18:25:20 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+
+
+/**
+ * a class wraps around a server's proxy,
+ * containing a list of its supported methods.
+ *
+ * A list of methods with a value of null indicates that the client and server
+ * have the same protocol.
+ */
+public class ProtocolProxy<T> {
+ private Class<T> protocol;
+ private T proxy;
+ private HashSet<Integer> serverMethods = null;
+ final private boolean supportServerMethodCheck;
+ private boolean serverMethodsFetched = false;
+
+ /**
+ * Constructor
+ *
+ * @param protocol protocol class
+ * @param proxy its proxy
+ * @param supportServerMethodCheck If false proxy will never fetch server
+ * methods and isMethodSupported will always return true. If true,
+ * server methods will be fetched for the first call to
+ * isMethodSupported.
+ */
+ public ProtocolProxy(Class<T> protocol, T proxy,
+ boolean supportServerMethodCheck) {
+ this.protocol = protocol;
+ this.proxy = proxy;
+ this.supportServerMethodCheck = supportServerMethodCheck;
+ }
+
+ private void fetchServerMethods(Method method) throws IOException {
+ long clientVersion;
+ try {
+ Field versionField = method.getDeclaringClass().getField("versionID");
+ versionField.setAccessible(true);
+ clientVersion = versionField.getLong(method.getDeclaringClass());
+ } catch (NoSuchFieldException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ int clientMethodsHash = ProtocolSignature.getFingerprint(method
+ .getDeclaringClass().getMethods());
+ ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
+ .getProtocolSignature(protocol.getName(), clientVersion,
+ clientMethodsHash);
+ long serverVersion = serverInfo.getVersion();
+ if (serverVersion != clientVersion) {
+ throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ int[] serverMethodsCodes = serverInfo.getMethods();
+ if (serverMethodsCodes != null) {
+ serverMethods = new HashSet<Integer>(serverMethodsCodes.length);
+ for (int m : serverMethodsCodes) {
+ this.serverMethods.add(Integer.valueOf(m));
+ }
+ }
+ serverMethodsFetched = true;
+ }
+
+ /*
+ * Get the proxy
+ */
+ public T getProxy() {
+ return proxy;
+ }
+
+ /**
+ * Check if a method is supported by the server or not
+ *
+ * @param methodName a method's name in String format
+ * @param parameterTypes a method's parameter types
+ * @return true if the method is supported by the server
+ */
+ public synchronized boolean isMethodSupported(String methodName,
+ Class<?>... parameterTypes)
+ throws IOException {
+ if (!supportServerMethodCheck) {
+ return true;
+ }
+ Method method;
+ try {
+ method = protocol.getDeclaredMethod(methodName, parameterTypes);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException(e);
+ }
+ if (!serverMethodsFetched) {
+ fetchServerMethods(method);
+ }
+ if (serverMethods == null) { // client & server have the same protocol
+ return true;
+ }
+ return serverMethods.contains(
+ Integer.valueOf(ProtocolSignature.getFingerprint(method)));
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1128018&view=auto
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolSignature.java (added)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/ProtocolSignature.java Thu May 26 18:25:20 2011
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+public class ProtocolSignature implements Writable {
+ static { // register a ctor
+ WritableFactories.setFactory
+ (ProtocolSignature.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new ProtocolSignature(); }
+ });
+ }
+
+ private long version;
+ private int[] methods = null; // an array of method hash codes
+
+ /**
+ * default constructor
+ */
+ public ProtocolSignature() {
+ }
+
+ /**
+ * Constructor
+ *
+ * @param version server version
+ * @param methodHashcodes hash codes of the methods supported by server
+ */
+ public ProtocolSignature(long version, int[] methodHashcodes) {
+ this.version = version;
+ this.methods = methodHashcodes;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ public int[] getMethods() {
+ return methods;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ version = in.readLong();
+ boolean hasMethods = in.readBoolean();
+ if (hasMethods) {
+ int numMethods = in.readInt();
+ methods = new int[numMethods];
+ for (int i=0; i<numMethods; i++) {
+ methods[i] = in.readInt();
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(version);
+ if (methods == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeInt(methods.length);
+ for (int method : methods) {
+ out.writeInt(method);
+ }
+ }
+ }
+
+ /**
+ * Calculate a method's hash code considering its method
+ * name, returning type, and its parameter types
+ *
+ * @param method a method
+ * @return its hash code
+ */
+ static int getFingerprint(Method method) {
+ int hashcode = method.getName().hashCode();
+ hashcode = hashcode + 31*method.getReturnType().getName().hashCode();
+ for (Class<?> type : method.getParameterTypes()) {
+ hashcode = 31*hashcode ^ type.getName().hashCode();
+ }
+ return hashcode;
+ }
+
+ /**
+ * Convert an array of Method into an array of hash codes
+ *
+ * @param methods
+ * @return array of hash codes
+ */
+ private static int[] getFingerprints(Method[] methods) {
+ if (methods == null) {
+ return null;
+ }
+ int[] hashCodes = new int[methods.length];
+ for (int i = 0; i<methods.length; i++) {
+ hashCodes[i] = getFingerprint(methods[i]);
+ }
+ return hashCodes;
+ }
+
+ /**
+ * Get the hash code of an array of methods
+ * Methods are sorted before hashcode is calculated.
+ * So the returned value is irrelevant of the method order in the array.
+ *
+ * @param methods an array of methods
+ * @return the hash code
+ */
+ static int getFingerprint(Method[] methods) {
+ return getFingerprint(getFingerprints(methods));
+ }
+
+ /**
+ * Get the hash code of an array of hashcodes
+ * Hashcodes are sorted before hashcode is calculated.
+ * So the returned value is irrelevant of the hashcode order in the array.
+ *
+ * @param methods an array of methods
+ * @return the hash code
+ */
+ static int getFingerprint(int[] hashcodes) {
+ Arrays.sort(hashcodes);
+ return Arrays.hashCode(hashcodes);
+
+ }
+ private static class ProtocolSigFingerprint {
+ private ProtocolSignature signature;
+ private int fingerprint;
+
+ ProtocolSigFingerprint(ProtocolSignature sig, int fingerprint) {
+ this.signature = sig;
+ this.fingerprint = fingerprint;
+ }
+ }
+
+ /**
+ * A cache that maps a protocol's name to its signature & finger print
+ */
+ final private static HashMap<String, ProtocolSigFingerprint>
+ PROTOCOL_FINGERPRINT_CACHE =
+ new HashMap<String, ProtocolSigFingerprint>();
+
+ /**
+ * Return a protocol's signature and finger print from cache
+ *
+ * @param protocol a protocol class
+ * @param serverVersion protocol version
+ * @return its signature and finger print
+ */
+ private static ProtocolSigFingerprint getSigFingerprint(
+ Class <? extends VersionedProtocol> protocol, long serverVersion) {
+ String protocolName = protocol.getName();
+ synchronized (PROTOCOL_FINGERPRINT_CACHE) {
+ ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
+ if (sig == null) {
+ int[] serverMethodHashcodes = getFingerprints(protocol.getMethods());
+ sig = new ProtocolSigFingerprint(
+ new ProtocolSignature(serverVersion, serverMethodHashcodes),
+ getFingerprint(serverMethodHashcodes));
+ PROTOCOL_FINGERPRINT_CACHE.put(protocolName, sig);
+ }
+ return sig;
+ }
+ }
+
+ /**
+ * Get a server protocol's signature
+ *
+ * @param clientMethodsHashCode client protocol methods hashcode
+ * @param serverVersion server protocol version
+ * @param protocol protocol
+ * @return the server's protocol signature
+ */
+ static ProtocolSignature getProtocolSignature(
+ int clientMethodsHashCode,
+ long serverVersion,
+ Class<? extends VersionedProtocol> protocol) {
+ // try to get the finger print & signature from the cache
+ ProtocolSigFingerprint sig = getSigFingerprint(protocol, serverVersion);
+
+ // check if the client side protocol matches the one on the server side
+ if (clientMethodsHashCode == sig.fingerprint) {
+ return new ProtocolSignature(serverVersion, null); // null indicates a match
+ }
+
+ return sig.signature;
+ }
+
+ /**
+ * Get a server protocol's signature
+ *
+ * @param server server implementation
+ * @param protocol server protocol
+ * @param clientVersion client's version
+ * @param clientMethodsHash client's protocol's hash code
+ * @return the server protocol's signature
+ * @throws IOException if any error occurs
+ */
+ @SuppressWarnings("unchecked")
+ public static ProtocolSignature getProtocolSigature(VersionedProtocol server,
+ String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)Class.forName(protocol);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ long serverVersion = server.getProtocolVersion(protocol, clientVersion);
+ return ProtocolSignature.getProtocolSignature(
+ clientMethodsHash, serverVersion, inter);
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RPC.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RPC.java Thu May 26 18:25:20 2011
@@ -61,7 +61,7 @@ import org.apache.hadoop.util.Reflection
* the protocol instance is transmitted.
*/
public class RPC {
- private static final Log LOG = LogFactory.getLog(RPC.class);
+ static final Log LOG = LogFactory.getLog(RPC.class);
private RPC() {} // no public ctor
@@ -156,18 +156,48 @@ public class RPC {
return serverVersion;
}
}
-
- public static Object waitForProxy(
- Class<?> protocol,
+
+ /**
+ * Get a proxy connection to a remote server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @return the proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> T waitForProxy(
+ Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf
) throws IOException {
- return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @return the protocol proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return waitForProtocolProxy(
+ protocol, clientVersion, addr, conf, Long.MAX_VALUE);
}
/**
* Get a proxy connection to a remote server
+ *
* @param protocol protocol class
* @param clientVersion client version
* @param addr remote address
@@ -176,23 +206,68 @@ public class RPC {
* @return the proxy
* @throws IOException if the far end through a RemoteException
*/
- public static Object waitForProxy(Class<?> protocol, long clientVersion,
+ public static <T> T waitForProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
- return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+ return waitForProtocolProxy(protocol, clientVersion, addr,
+ conf, connTimeout).getProxy();
}
- /**
- * Get a proxy connection to a remote server
- * @param protocol protocol class
- * @param clientVersion client version
- * @param addr remote address
- * @param conf configuration to use
- * @param rpcTimeout timeout for each RPC
- * @param timeout time in milliseconds before giving up
- * @return the proxy
- * @throws IOException if the far end through a RemoteException
- */
- public static Object waitForProxy(Class<?> protocol, long clientVersion,
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param connTimeout time in milliseconds before giving up
+ * @return the protocol proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr, Configuration conf,
+ long connTimeout) throws IOException {
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+ }
+
+ /**
+ * Get a proxy connection to a remote server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param rpcTimeout timeout for each RPC
+ * @param timeout time in milliseconds before giving up
+ * @return the proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> T waitForProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr, Configuration conf,
+ int rpcTimeout,
+ long timeout) throws IOException {
+ return waitForProtocolProxy(protocol, clientVersion, addr,
+ conf, rpcTimeout, timeout).getProxy();
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param rpcTimeout timeout for each RPC
+ * @param timeout time in milliseconds before giving up
+ * @return the proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
+ long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
long timeout) throws IOException {
@@ -200,7 +275,7 @@ public class RPC {
IOException ioe;
while (true) {
try {
- return getProxy(protocol, clientVersion, addr,
+ return getProtocolProxy(protocol, clientVersion, addr,
UserGroupInformation.getCurrentUser(), conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
} catch(ConnectException se) { // namenode has not been started
@@ -228,27 +303,76 @@ public class RPC {
}
/** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address. */
- public static Object getProxy(Class<?> protocol, long clientVersion,
+ * talking to a server at the named address.
+ * @param <T>*/
+ public static <T> T getProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr, Configuration conf,
+ SocketFactory factory) throws IOException {
+ return getProtocolProxy(
+ protocol, clientVersion, addr, conf, factory).getProxy();
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param factory socket factory
+ * @return the protocol proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+ long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
+ return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory);
}
/** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address. */
- public static Object getProxy(Class<?> protocol, long clientVersion,
+ * talking to a server at the named address.
+ * @param <T>*/
+ public static <T> T getProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Configuration conf,
+ SocketFactory factory) throws IOException {
+ return getProtocolProxy(
+ protocol, clientVersion, addr, ticket, conf, factory).getProxy();
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param ticket user group information
+ * @param conf configuration to use
+ * @param factory socket factory
+ * @return the protocol proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+ long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory) throws IOException {
- return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+ return getProtocolProxy(
+ protocol, clientVersion, addr, ticket, conf, factory, 0);
}
/**
* Construct a client-side proxy that implements the named protocol,
* talking to a server at the named address.
+ * @param <T>
*
* @param protocol protocol
* @param clientVersion client's version
@@ -260,7 +384,33 @@ public class RPC {
* @return the proxy
* @throws IOException if any error occurs
*/
- public static Object getProxy(Class<?> protocol, long clientVersion,
+ public static <T> T getProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Configuration conf,
+ SocketFactory factory,
+ int rpcTimeout) throws IOException {
+ return getProtocolProxy(protocol, clientVersion, addr, ticket,
+ conf, factory, rpcTimeout).getProxy();
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol
+ * @param clientVersion client's version
+ * @param addr server address
+ * @param ticket security ticket
+ * @param conf configuration
+ * @param factory socket factory
+ * @param rpcTimeout max time for each rpc; 0 means no timeout
+ * @return the proxy
+ * @throws IOException if any error occurs
+ */
+ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+ long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
@@ -273,21 +423,42 @@ public class RPC {
clientVersion, addr, ticket, conf, factory, rpcTimeout);
}
+ /**
+ * Construct a client-side proxy object with the default SocketFactory
+ * @param <T>
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @return a proxy instance
+ * @throws IOException
+ */
+ public static <T> T getProxy(Class<T> protocol,
+ long clientVersion,
+ InetSocketAddress addr, Configuration conf)
+ throws IOException {
+
+ return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
+ }
+
/**
- * Construct a client-side proxy object with the default SocketFactory
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
*
* @param protocol
* @param clientVersion
* @param addr
* @param conf
- * @return a proxy instance
+ * @return a protocol proxy
* @throws IOException
*/
- public static Object getProxy(Class<?> protocol, long clientVersion,
+ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+ long clientVersion,
InetSocketAddress addr, Configuration conf)
throws IOException {
- return getProxy(protocol, clientVersion, addr, conf, NetUtils
+ return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf));
}
Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/RpcEngine.java Thu May 26 18:25:20 2011
@@ -34,8 +34,9 @@ import org.apache.hadoop.security.token.
@InterfaceStability.Evolving
public interface RpcEngine {
- /** Construct a client-side proxy object. */
- Object getProxy(Class<?> protocol,
+ /** Construct a client-side proxy object.
+ * @param <T>*/
+ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
long clientVersion, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException;
Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/VersionedProtocol.java Thu May 26 18:25:20 2011
@@ -32,7 +32,23 @@ public interface VersionedProtocol {
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @return the version that the server will speak
+ * @throws IOException if any IO error occurs
*/
- public long getProtocolVersion(String protocol,
+ @Deprecated
+ public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
+
+ /**
+ * Return protocol version corresponding to protocol interface.
+ * @param protocol The classname of the protocol interface
+ * @param clientVersion The version of the protocol that the client speaks
+ * @param clientMethodsHash the hashcode of client protocol methods
+ * @return the server protocol signature containing its version and
+ * a list of its supported methods
+ * @see ProtocolSignature#getProtocolSigature(VersionedProtocol, String,
+ * long, int) for a default implementation
+ */
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion,
+ int clientMethodsHash) throws IOException;
}
Modified: hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/yahoo-merge/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Thu May 26 18:25:20 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.ipc;
+import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
@@ -45,6 +46,10 @@ import org.apache.hadoop.conf.*;
@InterfaceStability.Evolving
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
+
+ //writableRpcVersion should be updated if there is a change
+ //in format of the rpc messages.
+ public static long writableRpcVersion = 1L;
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
@@ -52,6 +57,12 @@ public class WritableRpcEngine implement
private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
+ private long clientVersion;
+ private int clientMethodsHash;
+
+ //This could be different from static writableRpcVersion when received
+ //at server, if client is using a different version.
+ private long rpcVersion;
public Invocation() {}
@@ -59,6 +70,24 @@ public class WritableRpcEngine implement
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
+ rpcVersion = writableRpcVersion;
+ if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ //VersionedProtocol is exempted from version check.
+ clientVersion = 0;
+ clientMethodsHash = 0;
+ } else {
+ try {
+ Field versionField = method.getDeclaringClass().getField("versionID");
+ versionField.setAccessible(true);
+ this.clientVersion = versionField.getLong(method.getDeclaringClass());
+ } catch (NoSuchFieldException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ this.clientMethodsHash = ProtocolSignature.getFingerprint(method
+ .getDeclaringClass().getMethods());
+ }
}
/** The name of the method invoked. */
@@ -69,9 +98,28 @@ public class WritableRpcEngine implement
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
+
+ private long getProtocolVersion() {
+ return clientVersion;
+ }
+
+ private int getClientMethodsHash() {
+ return clientMethodsHash;
+ }
+
+ /**
+ * Returns the rpc version used by the client.
+ * @return rpcVersion
+ */
+ public long getRpcVersion() {
+ return rpcVersion;
+ }
public void readFields(DataInput in) throws IOException {
+ rpcVersion = in.readLong();
methodName = UTF8.readString(in);
+ clientVersion = in.readLong();
+ clientMethodsHash = in.readInt();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
@@ -82,7 +130,10 @@ public class WritableRpcEngine implement
}
public void write(DataOutput out) throws IOException {
+ out.writeLong(rpcVersion);
UTF8.writeString(out, methodName);
+ out.writeLong(clientVersion);
+ out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -100,6 +151,9 @@ public class WritableRpcEngine implement
buffer.append(parameters[i]);
}
buffer.append(")");
+ buffer.append(", rpc version="+rpcVersion);
+ buffer.append(", client version="+clientVersion);
+ buffer.append(", methodsFingerPrint="+clientMethodsHash);
return buffer.toString();
}
@@ -219,25 +273,20 @@ public class WritableRpcEngine implement
}
/** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address. */
- public Object getProxy(Class<?> protocol, long clientVersion,
+ * talking to a server at the named address.
+ * @param <T>*/
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
- Object proxy = Proxy.newProxyInstance
- (protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- if (proxy instanceof VersionedProtocol) {
- long serverVersion = ((VersionedProtocol)proxy)
- .getProtocolVersion(protocol.getName(), clientVersion);
- if (serverVersion != clientVersion) {
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
- return proxy;
+ T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+ new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
+ factory, rpcTimeout));
+ return new ProtocolProxy<T>(protocol, proxy, true);
}
/**
@@ -342,6 +391,31 @@ public class WritableRpcEngine implement
call.getParameterClasses());
method.setAccessible(true);
+ // Verify rpc version
+ if (call.getRpcVersion() != writableRpcVersion) {
+ // Client is using a different version of WritableRpc
+ throw new IOException(
+ "WritableRpc version mismatch, client side version="
+ + call.getRpcVersion() + ", server side version="
+ + writableRpcVersion);
+ }
+
+ //Verify protocol version.
+ //Bypass the version check for VersionedProtocol
+ if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ long clientVersion = call.getProtocolVersion();
+ ProtocolSignature serverInfo = ((VersionedProtocol) instance)
+ .getProtocolSignature(protocol.getCanonicalName(), call
+ .getProtocolVersion(), call.getClientMethodsHash());
+ long serverVersion = serverInfo.getVersion();
+ if (serverVersion != clientVersion) {
+ LOG.warn("Version mismatch: client version=" + clientVersion
+ + ", server version=" + serverVersion);
+ throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ }
+
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
Modified: hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java (original)
+++ hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java Thu May 26 18:25:20 2011
@@ -132,6 +132,15 @@ public class MiniRPCBenchmark {
throw new IOException("Unknown protocol: " + protocol);
}
+ @Override // VersionedProtocol
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion,
+ int clientMethodsHashCode) throws IOException {
+ if (protocol.equals(MiniProtocol.class.getName()))
+ return new ProtocolSignature(versionID, null);
+ throw new IOException("Unknown protocol: " + protocol);
+ }
+
@Override // MiniProtocol
public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
Modified: hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPC.java Thu May 26 18:25:20 2011
@@ -77,6 +77,11 @@ public class TestRPC extends TestCase {
return TestProtocol.versionID;
}
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int hashcode) {
+ return new ProtocolSignature(TestProtocol.versionID, null);
+ }
+
public void ping() {}
public synchronized void slowPing(boolean shouldSlow) {
@@ -253,11 +258,10 @@ public class TestRPC extends TestCase {
// Check rpcMetrics
MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
- // Number 4 includes getProtocolVersion()
- assertCounter("RpcProcessingTimeNumOps", 4L, rb);
+ assertCounter("RpcProcessingTimeNumOps", 3L, rb);
assertCounterGt("SentBytes", 0L, rb);
assertCounterGt("ReceivedBytes", 0L, rb);
-
+
// Number of calls to echo method should be 2
rb = getMetrics(server.rpcDetailedMetrics.name());
assertCounter("EchoNumOps", 2L, rb);
@@ -335,8 +339,9 @@ public class TestRPC extends TestCase {
public void testStandaloneClient() throws IOException {
try {
- RPC.waitForProxy(TestProtocol.class,
+ TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
+ proxy.echo("");
fail("We should not have reached here");
} catch (ConnectException ioe) {
//this is what we expected
@@ -454,6 +459,7 @@ public class TestRPC extends TestCase {
try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
+ proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
@@ -479,6 +485,7 @@ public class TestRPC extends TestCase {
try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, mulitServerAddr, conf);
+ proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
Added: hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1128018&view=auto
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java (added)
+++ hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Thu May 26 18:25:20 2011
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.After;
+import org.junit.Test;
+
+/** Unit test for supporting method-name based compatible RPCs. */
+public class TestRPCCompatibility {
+ private static final String ADDRESS = "0.0.0.0";
+ private static InetSocketAddress addr;
+ private static Server server;
+ private ProtocolProxy<?> proxy;
+
+ public static final Log LOG =
+ LogFactory.getLog(TestRPCCompatibility.class);
+
+ private static Configuration conf = new Configuration();
+
+ public interface TestProtocol0 extends VersionedProtocol {
+ public static final long versionID = 0L;
+ void ping() throws IOException;
+ }
+
+ public interface TestProtocol1 extends TestProtocol0 {
+ String echo(String value) throws IOException;
+ }
+
+ public interface TestProtocol2 extends TestProtocol1 {
+ int echo(int value) throws IOException;
+ }
+
+ public static class TestImpl0 implements TestProtocol0 {
+ @Override
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ return versionID;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHashCode)
+ throws IOException {
+ Class<? extends VersionedProtocol> inter;
+ try {
+ inter = (Class<? extends VersionedProtocol>)getClass().getGenericInterfaces()[0];
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ProtocolSignature.getProtocolSignature(clientMethodsHashCode,
+ getProtocolVersion(protocol, clientVersion), inter);
+ }
+
+ @Override
+ public void ping() { return; }
+ }
+
+ public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
+ @Override
+ public String echo(String value) { return value; }
+ }
+
+ public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
+ @Override
+ public int echo(int value) { return value; }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (proxy != null) {
+ RPC.stopProxy(proxy.getProxy());
+ }
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Test // old client vs new server
+ public void testVersion0ClientVersion1Server() throws Exception {
+ // create a server with two handlers
+ server = RPC.getServer(TestProtocol1.class,
+ new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ proxy = RPC.getProtocolProxy(
+ TestProtocol0.class, TestProtocol0.versionID, addr, conf);
+
+ TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy();
+ proxy0.ping();
+ }
+
+ @Test // old client vs new server
+ public void testVersion1ClientVersion0Server() throws Exception {
+ // create a server with two handlers
+ server = RPC.getServer(TestProtocol0.class,
+ new TestImpl0(), ADDRESS, 0, 2, false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ proxy = RPC.getProtocolProxy(
+ TestProtocol1.class, TestProtocol1.versionID, addr, conf);
+
+ TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy();
+ proxy1.ping();
+ try {
+ proxy1.echo("hello");
+ fail("Echo should fail");
+ } catch(IOException e) {
+ }
+ }
+
+ private class Version2Client {
+
+ private TestProtocol2 proxy2;
+ private ProtocolProxy<TestProtocol2> serverInfo;
+
+ private Version2Client() throws IOException {
+ serverInfo = RPC.getProtocolProxy(
+ TestProtocol2.class, TestProtocol2.versionID, addr, conf);
+ proxy2 = serverInfo.getProxy();
+ }
+
+ public int echo(int value) throws IOException, NumberFormatException {
+ if (serverInfo.isMethodSupported("echo", int.class)) {
+ return -value; // use version 3 echo long
+ } else { // server is version 2
+ return Integer.parseInt(proxy2.echo(String.valueOf(value)));
+ }
+ }
+
+ public String echo(String value) throws IOException {
+ return proxy2.echo(value);
+ }
+
+ public void ping() throws IOException {
+ proxy2.ping();
+ }
+ }
+
+ @Test // Compatible new client & old server
+ public void testVersion2ClientVersion1Server() throws Exception {
+ // create a server with two handlers
+ server = RPC.getServer(TestProtocol1.class,
+ new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+
+ Version2Client client = new Version2Client();
+ client.ping();
+ assertEquals("hello", client.echo("hello"));
+
+ // echo(int) is not supported by server, so returning 3
+ // This verifies that echo(int) and echo(String)'s hash codes are different
+ assertEquals(3, client.echo(3));
+ }
+
+ @Test // equal version client and server
+ public void testVersion2ClientVersion2Server() throws Exception {
+ // create a server with two handlers
+ server = RPC.getServer(TestProtocol2.class,
+ new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ Version2Client client = new Version2Client();
+
+ client.ping();
+ assertEquals("hello", client.echo("hello"));
+
+ // now that echo(int) is supported by the server, echo(int) should return -3
+ assertEquals(-3, client.echo(3));
+ }
+
+ public interface TestProtocol3 {
+ int echo(String value);
+ int echo(int value);
+ int echo_alias(int value);
+ int echo(int value1, int value2);
+ }
+
+ @Test
+ public void testHashCode() throws Exception {
+ // make sure that overriding methods have different hashcodes
+ Method strMethod = TestProtocol3.class.getMethod("echo", String.class);
+ int stringEchoHash = ProtocolSignature.getFingerprint(strMethod);
+ Method intMethod = TestProtocol3.class.getMethod("echo", int.class);
+ int intEchoHash = ProtocolSignature.getFingerprint(intMethod);
+ assertFalse(stringEchoHash == intEchoHash);
+
+ // make sure methods with the same signature
+ // from different declaring classes have the same hash code
+ int intEchoHash1 = ProtocolSignature.getFingerprint(
+ TestProtocol2.class.getMethod("echo", int.class));
+ assertEquals(intEchoHash, intEchoHash1);
+
+ // Methods with the same name and parameter types but different returning
+ // types have different hash codes
+ int stringEchoHash1 = ProtocolSignature.getFingerprint(
+ TestProtocol2.class.getMethod("echo", String.class));
+ assertFalse(stringEchoHash == stringEchoHash1);
+
+ // Make sure that methods with the same returning type and parameter types
+ // but different method names have different hash code
+ int intEchoHashAlias = ProtocolSignature.getFingerprint(
+ TestProtocol3.class.getMethod("echo_alias", int.class));
+ assertFalse(intEchoHash == intEchoHashAlias);
+
+ // Make sure that methods with the same returninig type and method name but
+ // larger number of parameter types have different hash code
+ int intEchoHash2 = ProtocolSignature.getFingerprint(
+ TestProtocol3.class.getMethod("echo", int.class, int.class));
+ assertFalse(intEchoHash == intEchoHash2);
+
+ // make sure that methods order does not matter for method array hash code
+ int hash1 = ProtocolSignature.getFingerprint(new Method[] {intMethod, strMethod});
+ int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod});
+ assertEquals(hash1, hash2);
+ }
+
+ public interface TestProtocol4 extends TestProtocol2 {
+ public static final long versionID = 1L;
+ int echo(int value) throws IOException;
+ }
+
+ @Test
+ public void testVersionMismatch() throws IOException {
+ server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+ false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
+ TestProtocol4.versionID, addr, conf);
+ try {
+ proxy.echo(21);
+ fail("The call must throw VersionMismatch exception");
+ } catch (IOException ex) {
+ Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java Thu May 26 18:25:20 2011
@@ -374,17 +374,20 @@ public class TestSaslRPC {
try {
proxy1 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
+ proxy1.getAuthMethod();
Client client = WritableRpcEngine.getClient(conf);
Set<ConnectionId> conns = client.getConnectionIds();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// same conf, connection should be re-used
proxy2 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
+ proxy2.getAuthMethod();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// different conf, new connection should be set up
newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_2);
proxy3 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
+ proxy3.getAuthMethod();
ConnectionId[] connsArray = conns.toArray(new ConnectionId[0]);
assertEquals("number of connections in cache is wrong", 2,
connsArray.length);
Modified: hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java?rev=1128018&r1=1128017&r2=1128018&view=diff
==============================================================================
--- hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java (original)
+++ hadoop/common/branches/yahoo-merge/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java Thu May 26 18:25:20 2011
@@ -30,6 +30,7 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
@@ -134,9 +135,14 @@ public class TestDoAsEffectiveUser {
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
- // TODO Auto-generated method stub
return TestProtocol.versionID;
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return new ProtocolSignature(TestProtocol.versionID, null);
+ }
}
@Test
@@ -161,7 +167,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
@@ -203,7 +209,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
@@ -250,7 +256,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
@@ -289,7 +295,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
@@ -368,7 +374,7 @@ public class TestDoAsEffectiveUser {
String retVal = proxyUserUgi
.doAs(new PrivilegedExceptionAction<String>() {
public String run() throws IOException {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
@@ -424,7 +430,7 @@ public class TestDoAsEffectiveUser {
@Override
public String run() throws Exception {
try {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
String ret = proxy.aMethod();
return ret;
@@ -477,7 +483,7 @@ public class TestDoAsEffectiveUser {
@Override
public String run() throws Exception {
try {
- proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, newConf);
String ret = proxy.aMethod();
return ret;