You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/05/03 01:03:42 UTC
svn commit: r1098840 - in /hadoop/common/trunk: ./
src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/
Author: todd
Date: Mon May 2 23:03:42 2011
New Revision: 1098840
URL: http://svn.apache.org/viewvc?rev=1098840&view=rev
Log:
Revert HADOOP-7227 from r1098792 since it broke HDFS and MR builds.
( svn merge -c -1098792 )
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon May 2 23:03:42 2011
@@ -102,9 +102,6 @@ Trunk (unreleased changes)
HADOOP-7235. Refactor the tail command to conform to new FsCommand class.
(Daryn Sharp via szetszwo)
- HADOOP-7227. Remove protocol version check at proxy creation in Hadoop
- RPC. (jitendra)
-
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Mon May 2 23:03:42 2011
@@ -61,8 +61,6 @@ public class AvroRpcEngine implements Rp
/** Tunnel an Avro RPC request and response through Hadoop's RPC. */
private static interface TunnelProtocol extends VersionedProtocol {
- //WritableRpcEngine expects a versionID in every protocol.
- public static final long versionID = 0L;
/** All Avro methods and responses go through this. */
BufferListWritable call(BufferListWritable request) throws IOException;
}
@@ -149,7 +147,7 @@ public class AvroRpcEngine implements Rp
protocol.getClassLoader(),
new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)),
- false);
+ null);
}
/** Stop this proxy. */
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java Mon May 2 23:03:42 2011
@@ -34,54 +34,24 @@ public class ProtocolProxy<T> {
private Class<T> protocol;
private T proxy;
private HashSet<Integer> serverMethods = null;
- final private boolean supportServerMethodCheck;
- private boolean serverMethodsFetched = false;
/**
* Constructor
*
* @param protocol protocol class
* @param proxy its proxy
- * @param supportServerMethodCheck If false proxy will never fetch server
- * methods and isMethodSupported will always return true. If true,
- * server methods will be fetched for the first call to
- * isMethodSupported.
+ * @param serverMethods a list of hash codes of the methods that it supports
+ * @throws ClassNotFoundException
*/
- public ProtocolProxy(Class<T> protocol, T proxy,
- boolean supportServerMethodCheck) {
+ public ProtocolProxy(Class<T> protocol, T proxy, int[] serverMethods) {
this.protocol = protocol;
this.proxy = proxy;
- this.supportServerMethodCheck = supportServerMethodCheck;
- }
-
- private void fetchServerMethods(Method method) throws IOException {
- long clientVersion;
- try {
- clientVersion = method.getDeclaringClass().getField("versionID").getLong(
- method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
- int clientMethodsHash = ProtocolSignature.getFingerprint(method
- .getDeclaringClass().getMethods());
- ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
- .getProtocolSignature(protocol.getName(), clientVersion,
- clientMethodsHash);
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- int[] serverMethodsCodes = serverInfo.getMethods();
- if (serverMethodsCodes != null) {
- serverMethods = new HashSet<Integer>(serverMethodsCodes.length);
- for (int m : serverMethodsCodes) {
- this.serverMethods.add(Integer.valueOf(m));
+ if (serverMethods != null) {
+ this.serverMethods = new HashSet<Integer>(serverMethods.length);
+ for (int method : serverMethods) {
+ this.serverMethods.add(Integer.valueOf(method));
}
}
- serverMethodsFetched = true;
}
/*
@@ -98,10 +68,10 @@ public class ProtocolProxy<T> {
* @param parameterTypes a method's parameter types
* @return true if the method is supported by the server
*/
- public synchronized boolean isMethodSupported(String methodName,
+ public boolean isMethodSupported(String methodName,
Class<?>... parameterTypes)
throws IOException {
- if (!supportServerMethodCheck) {
+ if (serverMethods == null) { // client & server have the same protocol
return true;
}
Method method;
@@ -112,12 +82,6 @@ public class ProtocolProxy<T> {
} catch (NoSuchMethodException e) {
throw new IOException(e);
}
- if (!serverMethodsFetched) {
- fetchServerMethods(method);
- }
- if (serverMethods == null) { // client & server have the same protocol
- return true;
- }
return serverMethods.contains(
Integer.valueOf(ProtocolSignature.getFingerprint(method)));
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon May 2 23:03:42 2011
@@ -46,10 +46,6 @@ import org.apache.hadoop.metrics.util.Me
@InterfaceStability.Evolving
public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
-
- //writableRpcVersion should be updated if there is a change
- //in format of the rpc messages.
- public static long writableRpcVersion = 1L;
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
@@ -57,12 +53,6 @@ public class WritableRpcEngine implement
private Class<?>[] parameterClasses;
private Object[] parameters;
private Configuration conf;
- private long clientVersion;
- private int clientMethodsHash;
-
- //This could be different from static writableRpcVersion when received
- //at server, if client is using a different version.
- private long rpcVersion;
public Invocation() {}
@@ -70,23 +60,6 @@ public class WritableRpcEngine implement
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
- rpcVersion = writableRpcVersion;
- if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
- //VersionedProtocol is exempted from version check.
- clientVersion = 0;
- clientMethodsHash = 0;
- } else {
- try {
- this.clientVersion = method.getDeclaringClass().getField("versionID")
- .getLong(method.getDeclaringClass());
- } catch (NoSuchFieldException ex) {
- throw new RuntimeException(ex);
- } catch (IllegalAccessException ex) {
- throw new RuntimeException(ex);
- }
- this.clientMethodsHash = ProtocolSignature.getFingerprint(method
- .getDeclaringClass().getMethods());
- }
}
/** The name of the method invoked. */
@@ -97,28 +70,9 @@ public class WritableRpcEngine implement
/** The parameter instances. */
public Object[] getParameters() { return parameters; }
-
- private long getProtocolVersion() {
- return clientVersion;
- }
-
- private int getClientMethodsHash() {
- return clientMethodsHash;
- }
-
- /**
- * Returns the rpc version used by the client.
- * @return rpcVersion
- */
- public long getRpcVersion() {
- return rpcVersion;
- }
public void readFields(DataInput in) throws IOException {
- rpcVersion = in.readLong();
methodName = UTF8.readString(in);
- clientVersion = in.readLong();
- clientMethodsHash = in.readInt();
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
@@ -129,10 +83,7 @@ public class WritableRpcEngine implement
}
public void write(DataOutput out) throws IOException {
- out.writeLong(rpcVersion);
UTF8.writeString(out, methodName);
- out.writeLong(clientVersion);
- out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -150,9 +101,6 @@ public class WritableRpcEngine implement
buffer.append(parameters[i]);
}
buffer.append(")");
- buffer.append(", rpc version="+rpcVersion);
- buffer.append(", client version="+clientVersion);
- buffer.append(", methodsFingerPrint="+clientMethodsHash);
return buffer.toString();
}
@@ -282,10 +230,22 @@ public class WritableRpcEngine implement
int rpcTimeout)
throws IOException {
- T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
- new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
- factory, rpcTimeout));
- return new ProtocolProxy<T>(protocol, proxy, true);
+ T proxy = (T)Proxy.newProxyInstance
+ (protocol.getClassLoader(), new Class[] { protocol },
+ new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
+ int[] serverMethods = null;
+ if (proxy instanceof VersionedProtocol) {
+ ProtocolSignature serverInfo = ((VersionedProtocol)proxy)
+ .getProtocolSignature(protocol.getName(), clientVersion,
+ ProtocolSignature.getFingerprint(protocol.getMethods()));
+ long serverVersion = serverInfo.getVersion();
+ if (serverVersion != clientVersion) {
+ throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ serverMethods = serverInfo.getMethods();
+ }
+ return new ProtocolProxy<T>(protocol, proxy, serverMethods);
}
/**
@@ -393,31 +353,6 @@ public class WritableRpcEngine implement
call.getParameterClasses());
method.setAccessible(true);
- // Verify rpc version
- if (call.getRpcVersion() != writableRpcVersion) {
- // Client is using a different version of WritableRpc
- throw new IOException(
- "WritableRpc version mismatch, client side version="
- + call.getRpcVersion() + ", server side version="
- + writableRpcVersion);
- }
-
- //Verify protocol version.
- //Bypass the version check for VersionedProtocol
- if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
- long clientVersion = call.getProtocolVersion();
- ProtocolSignature serverInfo = ((VersionedProtocol) instance)
- .getProtocolSignature(protocol.getCanonicalName(), call
- .getProtocolVersion(), call.getClientMethodsHash());
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- LOG.warn("Version mismatch: client version=" + clientVersion
- + ", server version=" + serverVersion);
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
-
long startTime = System.currentTimeMillis();
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Mon May 2 23:03:42 2011
@@ -290,7 +290,8 @@ public class TestRPC extends TestCase {
// Check rpcMetrics
server.rpcMetrics.doUpdates(new NullContext());
- assertEquals(3, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
+ // Number 4 includes getProtocolVersion()
+ assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
@@ -375,9 +376,8 @@ public class TestRPC extends TestCase {
public void testStandaloneClient() throws IOException {
try {
- TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+ RPC.waitForProxy(TestProtocol.class,
TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
- proxy.echo("");
fail("We should not have reached here");
} catch (ConnectException ioe) {
//this is what we expected
@@ -502,7 +502,6 @@ public class TestRPC extends TestCase {
try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, addr, conf);
- proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
@@ -528,7 +527,6 @@ public class TestRPC extends TestCase {
try {
proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, mulitServerAddr, conf);
- proxy.echo("");
} catch (RemoteException e) {
LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Mon May 2 23:03:42 2011
@@ -18,21 +18,19 @@
package org.apache.hadoop.ipc;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
-import junit.framework.Assert;
+import org.apache.commons.logging.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+
import org.junit.After;
+
import org.junit.Test;
/** Unit test for supporting method-name based compatible RPCs. */
@@ -249,26 +247,4 @@ public class TestRPCCompatibility {
int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod});
assertEquals(hash1, hash2);
}
-
- public interface TestProtocol4 extends TestProtocol2 {
- public static final long versionID = 1L;
- int echo(int value) throws IOException;
- }
-
- @Test
- public void testVersionMismatch() throws IOException {
- server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
- false, conf, null);
- server.start();
- addr = NetUtils.getConnectAddress(server);
-
- TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
- TestProtocol4.versionID, addr, conf);
- try {
- proxy.echo(21);
- fail("The call must throw VersionMismatch exception");
- } catch (IOException ex) {
- Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
- }
- }
}
\ No newline at end of file
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java Mon May 2 23:03:42 2011
@@ -321,20 +321,17 @@ public class TestSaslRPC {
try {
proxy1 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
- proxy1.getAuthMethod();
Client client = WritableRpcEngine.getClient(conf);
Set<ConnectionId> conns = client.getConnectionIds();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// same conf, connection should be re-used
proxy2 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
- proxy2.getAuthMethod();
assertEquals("number of connections in cache is wrong", 1, conns.size());
// different conf, new connection should be set up
newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_2);
proxy3 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
TestSaslProtocol.versionID, addr, newConf);
- proxy3.getAuthMethod();
ConnectionId[] connsArray = conns.toArray(new ConnectionId[0]);
assertEquals("number of connections in cache is wrong", 2,
connsArray.length);