You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2011/05/04 00:16:14 UTC
svn commit: r1099284 - in /hadoop/common/trunk: ./
src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/
Author: jitendra
Date: Tue May 3 22:16:14 2011
New Revision: 1099284
URL: http://svn.apache.org/viewvc?rev=1099284&view=rev
Log:
HADOOP-7227. Remove protocol version check at proxy creation in Hadoop RPC. Contributed by jitendra.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue May 3 22:16:14 2011
@@ -105,6 +105,9 @@ Trunk (unreleased changes)
HADOOP-7179. Federation: Improve HDFS startup scripts. (Erik Steffl
and Tanping Wang via suresh)
+ HADOOP-7227. Remove protocol version check at proxy creation in Hadoop
+ RPC. (jitendra)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Tue May 3 22:16:14 2011
@@ -61,6 +61,8 @@ public class AvroRpcEngine implements Rp
/** Tunnel an Avro RPC request and response through Hadoop's RPC. */
private static interface TunnelProtocol extends VersionedProtocol {
+ //WritableRpcEngine expects a versionID in every protocol.
+ public static final long versionID = 0L;
/** All Avro methods and responses go through this. */
BufferListWritable call(BufferListWritable request) throws IOException;
}
@@ -147,7 +149,7 @@ public class AvroRpcEngine implements Rp
protocol.getClassLoader(),
new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)),
- null);
+ false);
}
/** Stop this proxy. */
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java Tue May 3 22:16:14 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashSet;
@@ -34,24 +35,55 @@ public class ProtocolProxy<T> {
private Class<T> protocol;
private T proxy;
private HashSet<Integer> serverMethods = null;
+ final private boolean supportServerMethodCheck;
+ private boolean serverMethodsFetched = false;
/**
* Constructor
*
* @param protocol protocol class
* @param proxy its proxy
- * @param serverMethods a list of hash codes of the methods that it supports
- * @throws ClassNotFoundException
+ * @param supportServerMethodCheck If false proxy will never fetch server
+ * methods and isMethodSupported will always return true. If true,
+ * server methods will be fetched for the first call to
+ * isMethodSupported.
*/
- public ProtocolProxy(Class<T> protocol, T proxy, int[] serverMethods) {
+ public ProtocolProxy(Class<T> protocol, T proxy,
+ boolean supportServerMethodCheck) {
this.protocol = protocol;
this.proxy = proxy;
- if (serverMethods != null) {
- this.serverMethods = new HashSet<Integer>(serverMethods.length);
- for (int method : serverMethods) {
- this.serverMethods.add(Integer.valueOf(method));
+ this.supportServerMethodCheck = supportServerMethodCheck;
+ }
+
+ private void fetchServerMethods(Method method) throws IOException {
+ long clientVersion;
+ try {
+ Field versionField = method.getDeclaringClass().getField("versionID");
+ versionField.setAccessible(true);
+ clientVersion = versionField.getLong(method.getDeclaringClass());
+ } catch (NoSuchFieldException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ int clientMethodsHash = ProtocolSignature.getFingerprint(method
+ .getDeclaringClass().getMethods());
+ ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
+ .getProtocolSignature(protocol.getName(), clientVersion,
+ clientMethodsHash);
+ long serverVersion = serverInfo.getVersion();
+ if (serverVersion != clientVersion) {
+ throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ int[] serverMethodsCodes = serverInfo.getMethods();
+ if (serverMethodsCodes != null) {
+ serverMethods = new HashSet<Integer>(serverMethodsCodes.length);
+ for (int m : serverMethodsCodes) {
+ this.serverMethods.add(Integer.valueOf(m));
}
}
+ serverMethodsFetched = true;
}
/*
@@ -68,10 +100,10 @@ public class ProtocolProxy<T> {
* @param parameterTypes a method's parameter types
* @return true if the method is supported by the server
*/
- public boolean isMethodSupported(String methodName,
+ public synchronized boolean isMethodSupported(String methodName,
Class<?>... parameterTypes)
throws IOException {
- if (serverMethods == null) { // client & server have the same protocol
+ if (!supportServerMethodCheck) {
return true;
}
Method method;
@@ -82,6 +114,12 @@ public class ProtocolProxy<T> {
} catch (NoSuchMethodException e) {
throw new IOException(e);
}
+ if (!serverMethodsFetched) {
+ fetchServerMethods(method);
+ }
+ if (serverMethods == null) { // client & server have the same protocol
+ return true;
+ }
return serverMethods.contains(
Integer.valueOf(ProtocolSignature.getFingerprint(method)));
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Tue May 3 22:16:14 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.ipc;
+import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
@@ -46,6 +47,10 @@ import org.apache.hadoop.metrics.util.Me
@InterfaceStability.Evolving
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
+
+ //writableRpcVersion should be updated if there is a change
+ //in format of the rpc messages.
+ public static long writableRpcVersion = 1L;
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
@@ -53,6 +58,12 @@ public class WritableRpcEngine implement
private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
+ private long clientVersion;
+ private int clientMethodsHash;
+
+ //This could be different from static writableRpcVersion when received
+ //at server, if client is using a different version.
+ private long rpcVersion;
public Invocation() {}
@@ -60,6 +71,24 @@ public class WritableRpcEngine implement
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
+ rpcVersion = writableRpcVersion;
+ if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ //VersionedProtocol is exempted from version check.
+ clientVersion = 0;
+ clientMethodsHash = 0;
+ } else {
+ try {
+ Field versionField = method.getDeclaringClass().getField("versionID");
+ versionField.setAccessible(true);
+ this.clientVersion = versionField.getLong(method.getDeclaringClass());
+ } catch (NoSuchFieldException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ this.clientMethodsHash = ProtocolSignature.getFingerprint(method
+ .getDeclaringClass().getMethods());
+ }
}
/** The name of the method invoked. */
@@ -70,9 +99,28 @@ public class WritableRpcEngine implement
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
+
+ private long getProtocolVersion() {
+ return clientVersion;
+ }
+
+ private int getClientMethodsHash() {
+ return clientMethodsHash;
+ }
+
+ /**
+ * Returns the rpc version used by the client.
+ * @return rpcVersion
+ */
+ public long getRpcVersion() {
+ return rpcVersion;
+ }
public void readFields(DataInput in) throws IOException {
+ rpcVersion = in.readLong();
methodName = UTF8.readString(in);
+ clientVersion = in.readLong();
+ clientMethodsHash = in.readInt();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
@@ -83,7 +131,10 @@ public class WritableRpcEngine implement
}
public void write(DataOutput out) throws IOException {
+ out.writeLong(rpcVersion);
UTF8.writeString(out, methodName);
+ out.writeLong(clientVersion);
+ out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -101,6 +152,9 @@ public class WritableRpcEngine implement
buffer.append(parameters[i]);
}
buffer.append(")");
+ buffer.append(", rpc version="+rpcVersion);
+ buffer.append(", client version="+clientVersion);
+ buffer.append(", methodsFingerPrint="+clientMethodsHash);
return buffer.toString();
}
@@ -230,22 +284,10 @@ public class WritableRpcEngine implement
int rpcTimeout)
throws IOException {
- T proxy = (T)Proxy.newProxyInstance
- (protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- int[] serverMethods = null;
- if (proxy instanceof VersionedProtocol) {
- ProtocolSignature serverInfo = ((VersionedProtocol)proxy)
- .getProtocolSignature(protocol.getName(), clientVersion,
- ProtocolSignature.getFingerprint(protocol.getMethods()));
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- serverMethods = serverInfo.getMethods();
- }
- return new ProtocolProxy<T>(protocol, proxy, serverMethods);
+ T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
+ new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
+ factory, rpcTimeout));
+ return new ProtocolProxy<T>(protocol, proxy, true);
}
/**
@@ -353,6 +395,31 @@ public class WritableRpcEngine implement
call.getParameterClasses());
method.setAccessible(true);
+ // Verify rpc version
+ if (call.getRpcVersion() != writableRpcVersion) {
+ // Client is using a different version of WritableRpc
+ throw new IOException(
+ "WritableRpc version mismatch, client side version="
+ + call.getRpcVersion() + ", server side version="
+ + writableRpcVersion);
+ }
+
+ //Verify protocol version.
+ //Bypass the version check for VersionedProtocol
+ if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ long clientVersion = call.getProtocolVersion();
+ ProtocolSignature serverInfo = ((VersionedProtocol) instance)
+ .getProtocolSignature(protocol.getCanonicalName(), call
+ .getProtocolVersion(), call.getClientMethodsHash());
+ long serverVersion = serverInfo.getVersion();
+ if (serverVersion != clientVersion) {
+ LOG.warn("Version mismatch: client version=" + clientVersion
+ + ", server version=" + serverVersion);
+ throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ }
+
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Tue May 3 22:16:14 2011
@@ -290,8 +290,7 @@ public class TestRPC extends TestCase {
// Check rpcMetrics
server.rpcMetrics.doUpdates(new NullContext());
- // Number 4 includes getProtocolVersion()
- assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
+ assertEquals(3, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
@@ -376,8 +375,9 @@ public class TestRPC extends TestCase {
public void testStandaloneClient() throws IOException {
try {
- RPC.waitForProxy(TestProtocol.class,
+ TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
+ proxy.echo("");
fail("We should not have reached here");
} catch (ConnectException ioe) {
//this is what we expected
@@ -502,6 +502,7 @@ public class TestRPC extends TestCase {
try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
+ proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
@@ -527,6 +528,7 @@ public class TestRPC extends TestCase {
try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, mulitServerAddr, conf);
+ proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Tue May 3 22:16:14 2011
@@ -18,19 +18,21 @@
package org.apache.hadoop.ipc;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
-import org.apache.commons.logging.*;
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
-
import org.junit.After;
-
import org.junit.Test;
/** Unit test for supporting method-name based compatible RPCs. */
@@ -247,4 +249,26 @@ public class TestRPCCompatibility {
int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod});
assertEquals(hash1, hash2);
}
+
+ public interface TestProtocol4 extends TestProtocol2 {
+ public static final long versionID = 1L;
+ int echo(int value) throws IOException;
+ }
+
+ @Test
+ public void testVersionMismatch() throws IOException {
+ server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+ false, conf, null);
+ server.start();
+ addr = NetUtils.getConnectAddress(server);
+
+ TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
+ TestProtocol4.versionID, addr, conf);
+ try {
+ proxy.echo(21);
+ fail("The call must throw VersionMismatch exception");
+ } catch (IOException ex) {
+ Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+ }
+ }
}
\ No newline at end of file
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1099284&r1=1099283&r2=1099284&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java Tue May 3 22:16:14 2011
@@ -321,17 +321,20 @@ public class TestSaslRPC {
try {
proxy1 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
+ proxy1.getAuthMethod();
Client client = WritableRpcEngine.getClient(conf);
Set<ConnectionId> conns = client.getConnectionIds();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// same conf, connection should be re-used
proxy2 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
+ proxy2.getAuthMethod();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// different conf, new connection should be set up
newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_2);
proxy3 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
+ proxy3.getAuthMethod();
ConnectionId[] connsArray = conns.toArray(new ConnectionId[0]);
assertEquals("number of connections in cache is wrong", 2,
connsArray.length);