You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/05/03 01:03:42 UTC

svn commit: r1098840 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/

Author: todd
Date: Mon May  2 23:03:42 2011
New Revision: 1098840

URL: http://svn.apache.org/viewvc?rev=1098840&view=rev
Log:
Revert HADOOP-7227 from r1098792 since it broke HDFS and MR builds.

( svn merge -c -1098792 )

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon May  2 23:03:42 2011
@@ -102,9 +102,6 @@ Trunk (unreleased changes)
     HADOOP-7235. Refactor the tail command to conform to new FsCommand class.
     (Daryn Sharp via szetszwo)
 
-    HADOOP-7227. Remove protocol version check at proxy creation in Hadoop
-    RPC. (jitendra)
-
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Mon May  2 23:03:42 2011
@@ -61,8 +61,6 @@ public class AvroRpcEngine implements Rp
 
   /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
   private static interface TunnelProtocol extends VersionedProtocol {
-    //WritableRpcEngine expects a versionID in every protocol.
-    public static final long versionID = 0L;
     /** All Avro methods and responses go through this. */
     BufferListWritable call(BufferListWritable request) throws IOException;
   }
@@ -149,7 +147,7 @@ public class AvroRpcEngine implements Rp
          protocol.getClassLoader(),
          new Class[] { protocol },
          new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)),
-       false);
+       null);
   }
 
   /** Stop this proxy. */

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java Mon May  2 23:03:42 2011
@@ -34,54 +34,24 @@ public class ProtocolProxy<T> {
   private Class<T> protocol;
   private T proxy;
   private HashSet<Integer> serverMethods = null;
-  final private boolean supportServerMethodCheck;
-  private boolean serverMethodsFetched = false;
   
   /**
    * Constructor
    * 
    * @param protocol protocol class
    * @param proxy its proxy
-   * @param supportServerMethodCheck If false proxy will never fetch server
-   *        methods and isMethodSupported will always return true. If true,
-   *        server methods will be fetched for the first call to 
-   *        isMethodSupported. 
+   * @param serverMethods a list of hash codes of the methods that it supports
+   * @throws ClassNotFoundException 
    */
-  public ProtocolProxy(Class<T> protocol, T proxy,
-      boolean supportServerMethodCheck) {
+  public ProtocolProxy(Class<T> protocol, T proxy, int[] serverMethods) {
     this.protocol = protocol;
     this.proxy = proxy;
-    this.supportServerMethodCheck = supportServerMethodCheck;
-  }
-  
-  private void fetchServerMethods(Method method) throws IOException {
-    long clientVersion;
-    try {
-      clientVersion = method.getDeclaringClass().getField("versionID").getLong(
-          method.getDeclaringClass());
-    } catch (NoSuchFieldException ex) {
-      throw new RuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      throw new RuntimeException(ex);
-    }
-    int clientMethodsHash = ProtocolSignature.getFingerprint(method
-        .getDeclaringClass().getMethods());
-    ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
-        .getProtocolSignature(protocol.getName(), clientVersion,
-            clientMethodsHash);
-    long serverVersion = serverInfo.getVersion();
-    if (serverVersion != clientVersion) {
-      throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-          serverVersion);
-    }
-    int[] serverMethodsCodes = serverInfo.getMethods();
-    if (serverMethodsCodes != null) {
-      serverMethods = new HashSet<Integer>(serverMethodsCodes.length);
-      for (int m : serverMethodsCodes) {
-        this.serverMethods.add(Integer.valueOf(m));
+    if (serverMethods != null) {
+      this.serverMethods = new HashSet<Integer>(serverMethods.length);
+      for (int method : serverMethods) {
+        this.serverMethods.add(Integer.valueOf(method));
       }
     }
-    serverMethodsFetched = true;
   }
 
   /*
@@ -98,10 +68,10 @@ public class ProtocolProxy<T> {
    * @param parameterTypes a method's parameter types
    * @return true if the method is supported by the server
    */
-  public synchronized boolean isMethodSupported(String methodName,
+  public boolean isMethodSupported(String methodName,
                                    Class<?>... parameterTypes)
   throws IOException {
-    if (!supportServerMethodCheck) {
+    if (serverMethods == null) { // client & server have the same protocol
       return true;
     }
     Method method;
@@ -112,12 +82,6 @@ public class ProtocolProxy<T> {
     } catch (NoSuchMethodException e) {
       throw new IOException(e);
     }
-    if (!serverMethodsFetched) {
-      fetchServerMethods(method);
-    }
-    if (serverMethods == null) { // client & server have the same protocol
-      return true;
-    }
     return serverMethods.contains(
         Integer.valueOf(ProtocolSignature.getFingerprint(method)));
   }

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Mon May  2 23:03:42 2011
@@ -46,10 +46,6 @@ import org.apache.hadoop.metrics.util.Me
 @InterfaceStability.Evolving
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
-  
-  //writableRpcVersion should be updated if there is a change
-  //in format of the rpc messages.
-  public static long writableRpcVersion = 1L;
 
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
@@ -57,12 +53,6 @@ public class WritableRpcEngine implement
     private Class<?>[] parameterClasses;
     private Object[] parameters;
     private Configuration conf;
-    private long clientVersion;
-    private int clientMethodsHash;
-    
-    //This could be different from static writableRpcVersion when received
-    //at server, if client is using a different version.
-    private long rpcVersion;
 
     public Invocation() {}
 
@@ -70,23 +60,6 @@ public class WritableRpcEngine implement
       this.methodName = method.getName();
       this.parameterClasses = method.getParameterTypes();
       this.parameters = parameters;
-      rpcVersion = writableRpcVersion;
-      if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
-        //VersionedProtocol is exempted from version check.
-        clientVersion = 0;
-        clientMethodsHash = 0;
-      } else {
-        try {
-          this.clientVersion = method.getDeclaringClass().getField("versionID")
-              .getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
-        this.clientMethodsHash = ProtocolSignature.getFingerprint(method
-            .getDeclaringClass().getMethods());
-      }
     }
 
     /** The name of the method invoked. */
@@ -97,28 +70,9 @@ public class WritableRpcEngine implement
 
     /** The parameter instances. */
     public Object[] getParameters() { return parameters; }
-    
-    private long getProtocolVersion() {
-      return clientVersion;
-    }
-
-    private int getClientMethodsHash() {
-      return clientMethodsHash;
-    }
-    
-    /**
-     * Returns the rpc version used by the client.
-     * @return rpcVersion
-     */
-    public long getRpcVersion() {
-      return rpcVersion;
-    }
 
     public void readFields(DataInput in) throws IOException {
-      rpcVersion = in.readLong();
       methodName = UTF8.readString(in);
-      clientVersion = in.readLong();
-      clientMethodsHash = in.readInt();
       parameters = new Object[in.readInt()];
       parameterClasses = new Class[parameters.length];
       ObjectWritable objectWritable = new ObjectWritable();
@@ -129,10 +83,7 @@ public class WritableRpcEngine implement
     }
 
     public void write(DataOutput out) throws IOException {
-      out.writeLong(rpcVersion);
       UTF8.writeString(out, methodName);
-      out.writeLong(clientVersion);
-      out.writeInt(clientMethodsHash);
       out.writeInt(parameterClasses.length);
       for (int i = 0; i < parameterClasses.length; i++) {
         ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -150,9 +101,6 @@ public class WritableRpcEngine implement
         buffer.append(parameters[i]);
       }
       buffer.append(")");
-      buffer.append(", rpc version="+rpcVersion);
-      buffer.append(", client version="+clientVersion);
-      buffer.append(", methodsFingerPrint="+clientMethodsHash);
       return buffer.toString();
     }
 
@@ -282,10 +230,22 @@ public class WritableRpcEngine implement
                          int rpcTimeout)
     throws IOException {    
 
-    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
-        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
-            factory, rpcTimeout));
-    return new ProtocolProxy<T>(protocol, proxy, true);
+    T proxy = (T)Proxy.newProxyInstance
+      (protocol.getClassLoader(), new Class[] { protocol },
+       new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
+    int[] serverMethods = null;
+    if (proxy instanceof VersionedProtocol) {
+      ProtocolSignature serverInfo = ((VersionedProtocol)proxy)
+        .getProtocolSignature(protocol.getName(), clientVersion,
+            ProtocolSignature.getFingerprint(protocol.getMethods()));
+      long serverVersion = serverInfo.getVersion();
+      if (serverVersion != clientVersion) {
+        throw new RPC.VersionMismatch(protocol.getName(), clientVersion, 
+                                      serverVersion);
+      }
+      serverMethods = serverInfo.getMethods();
+    }
+    return new ProtocolProxy<T>(protocol, proxy, serverMethods);
   }
 
   /**
@@ -393,31 +353,6 @@ public class WritableRpcEngine implement
                                    call.getParameterClasses());
         method.setAccessible(true);
 
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
-        }
-        
-        //Verify protocol version.
-        //Bypass the version check for VersionedProtocol
-        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
-          long clientVersion = call.getProtocolVersion();
-          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
-              .getProtocolSignature(protocol.getCanonicalName(), call
-                  .getProtocolVersion(), call.getClientMethodsHash());
-          long serverVersion = serverInfo.getVersion();
-          if (serverVersion != clientVersion) {
-            LOG.warn("Version mismatch: client version=" + clientVersion
-                + ", server version=" + serverVersion);
-            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-                serverVersion);
-          }
-        }
-
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Mon May  2 23:03:42 2011
@@ -290,7 +290,8 @@ public class TestRPC extends TestCase {
     // Check rpcMetrics 
     server.rpcMetrics.doUpdates(new NullContext());
     
-    assertEquals(3, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
+    // Number 4 includes getProtocolVersion()
+    assertEquals(4, server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
     assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
     assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
     
@@ -375,9 +376,8 @@ public class TestRPC extends TestCase {
   
   public void testStandaloneClient() throws IOException {
     try {
-      TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
+      RPC.waitForProxy(TestProtocol.class,
         TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
-      proxy.echo("");
       fail("We should not have reached here");
     } catch (ConnectException ioe) {
       //this is what we expected
@@ -502,7 +502,6 @@ public class TestRPC extends TestCase {
     try {
       proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
           TestProtocol.versionID, addr, conf);
-      proxy.echo("");
     } catch (RemoteException e) {
       LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
       assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
@@ -528,7 +527,6 @@ public class TestRPC extends TestCase {
     try {
       proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
           TestProtocol.versionID, mulitServerAddr, conf);
-      proxy.echo("");
     } catch (RemoteException e) {
       LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
       assertTrue(e.unwrapRemoteException() instanceof AccessControlException);

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Mon May  2 23:03:42 2011
@@ -18,21 +18,19 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 
-import junit.framework.Assert;
+import org.apache.commons.logging.*;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+
 import org.junit.After;
+
 import org.junit.Test;
 
 /** Unit test for supporting method-name based compatible RPCs. */
@@ -249,26 +247,4 @@ public class TestRPCCompatibility {
     int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod});
     assertEquals(hash1, hash2);
   }
-  
-  public interface TestProtocol4 extends TestProtocol2 {
-    public static final long versionID = 1L;
-    int echo(int value)  throws IOException;
-  }
-  
-  @Test
-  public void testVersionMismatch() throws IOException {
-    server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
-        false, conf, null);
-    server.start();
-    addr = NetUtils.getConnectAddress(server);
-
-    TestProtocol4 proxy = RPC.getProxy(TestProtocol4.class,
-        TestProtocol4.versionID, addr, conf);
-    try {
-      proxy.echo(21);
-      fail("The call must throw VersionMismatch exception");
-    } catch (IOException ex) {
-      Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
-    }
-  }
 }
\ No newline at end of file

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1098840&r1=1098839&r2=1098840&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java Mon May  2 23:03:42 2011
@@ -321,20 +321,17 @@ public class TestSaslRPC {
     try {
       proxy1 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
           TestSaslProtocol.versionID, addr, newConf);
-      proxy1.getAuthMethod();
       Client client = WritableRpcEngine.getClient(conf);
       Set<ConnectionId> conns = client.getConnectionIds();
       assertEquals("number of connections in cache is wrong", 1, conns.size());
       // same conf, connection should be re-used
       proxy2 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
           TestSaslProtocol.versionID, addr, newConf);
-      proxy2.getAuthMethod();
       assertEquals("number of connections in cache is wrong", 1, conns.size());
       // different conf, new connection should be set up
       newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_2);
       proxy3 = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
           TestSaslProtocol.versionID, addr, newConf);
-      proxy3.getAuthMethod();
       ConnectionId[] connsArray = conns.toArray(new ConnectionId[0]);
       assertEquals("number of connections in cache is wrong", 2,
           connsArray.length);