You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2011/09/03 02:31:06 UTC

svn commit: r1164771 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/

Author: sradia
Date: Sat Sep  3 00:31:05 2011
New Revision: 1164771

URL: http://svn.apache.org/viewvc?rev=1164771&view=rev
Log:
  HADOOP-7524 and MapReduce-2887 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Sat Sep  3 00:31:05 2011
@@ -5,6 +5,7 @@ Trunk (unreleased changes)
   IMPROVEMENTS
 
   HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm)
+  HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
 
 Release 0.23.0 - Unreleased
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Sat Sep  3 00:31:05 2011
@@ -285,8 +285,8 @@ public class Client {
         authMethod = AuthMethod.KERBEROS;
       }
       
-      header = new ConnectionHeader(protocol == null ? null : protocol
-          .getName(), ticket, authMethod);
+      header = 
+        new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
       
       if (LOG.isDebugEnabled())
         LOG.debug("Use " + authMethod + " authentication for protocol "

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1164771&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Sat Sep  3 00:31:05 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+
+/**
+ * The protocol name that is used when a client and server connect.
+ * By default the class name of the protocol interface is the protocol name.
+ * 
+ * Why override the default name (i.e. the class name)?
+ * One use case overriding the default name (i.e. the class name) is when
+ * there are multiple implementations of the same protocol, each with say a
+ *  different version/serialization.
+ * In Hadoop this is used to allow multiple server and client adapters
+ * for different versions of the same protocol service.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ProtocolInfo {
+  String protocolName();  // the name of the protocol (i.e. rpc service)
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sat Sep  3 00:31:05 2011
@@ -62,6 +62,20 @@ import org.apache.hadoop.util.Reflection
  */
 public class RPC {
   static final Log LOG = LogFactory.getLog(RPC.class);
+  
+  
+  /**
+   * Get the protocol name.
+   *  If the protocol class has a ProtocolAnnotation, then get the protocol
+   *  name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public String getProtocolName(Class<?> protocol) {
+    if (protocol == null) {
+      return null;
+    }
+    ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+    return  (anno == null) ? protocol.getName() : anno.protocolName();
+  }
 
   private RPC() {}                                  // no public ctor
 
@@ -553,8 +567,10 @@ public class RPC {
   }
 
   /** Construct a server for a protocol implementation instance. */
-  public static Server getServer(Class<?> protocol,
-                                 Object instance, String bindAddress, int port,
+
+  public static <PROTO extends VersionedProtocol, IMPL extends PROTO> 
+        Server getServer(Class<PROTO> protocol,
+                                 IMPL instance, String bindAddress, int port,
                                  int numHandlers, int numReaders, int queueSizePerHandler,
                                  boolean verbose, Configuration conf,
                                  SecretManager<? extends TokenIdentifier> secretManager) 
@@ -576,6 +592,18 @@ public class RPC {
       super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
             conf, serverName, secretManager);
     }
+    
+    /**
+     * Add a protocol to the existing server.
+     * @param protocolClass - the protocol class
+     * @param protocolImpl - the impl of the protocol that will be called
+     * @return the server (for convenience)
+     */
+    public <PROTO extends VersionedProtocol, IMPL extends PROTO>
+      Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
+    ) throws IOException {
+      throw new IOException("addProtocol Not Implemented");
+    }
   }
 
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Sat Sep  3 00:31:05 2011
@@ -900,7 +900,7 @@ public abstract class Server {
     private InetAddress addr;
     
     ConnectionHeader header = new ConnectionHeader();
-    Class<?> protocol;
+    String protocolName;
     boolean useSasl;
     SaslServer saslServer;
     private AuthMethod authMethod;
@@ -1287,15 +1287,8 @@ public abstract class Server {
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(buf));
       header.readFields(in);
-      try {
-        String protocolClassName = header.getProtocol();
-        if (protocolClassName != null) {
-          protocol = getProtocolClass(header.getProtocol(), conf);
-          rpcDetailedMetrics.init(protocol);
-        }
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown protocol: " + header.getProtocol());
-      }
+      protocolName = header.getProtocol();
+
       
       UserGroupInformation protocolUser = header.getUgi();
       if (!useSasl) {
@@ -1484,7 +1477,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocol, call.param, 
+              value = call(call.connection.protocolName, call.param, 
                            call.timestamp);
             } else {
               value = 
@@ -1493,7 +1486,7 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocol, 
+                       return call(call.connection.protocolName, 
                                    call.param, call.timestamp);
 
                      }
@@ -1753,7 +1746,7 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   * @deprecated Use {@link #call(String, Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
@@ -1761,7 +1754,7 @@ public abstract class Server {
   }
   
   /** Called for each call. */
-  public abstract Writable call(Class<?> protocol,
+  public abstract Writable call(String protocol,
                                Writable param, long receiveTime)
   throws IOException;
   

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/VersionedProtocol.java Sat Sep  3 00:31:05 2011
@@ -34,7 +34,6 @@ public interface VersionedProtocol {
    * @return the version that the server will speak
    * @throws IOException if any IO error occurs
    */
-  @Deprecated
   public long getProtocolVersion(String protocol,
                                  long clientVersion) throws IOException;
 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Sat Sep  3 00:31:05 2011
@@ -27,6 +27,9 @@ import java.lang.reflect.InvocationTarge
 
 import java.net.InetSocketAddress;
 import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -35,6 +38,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -47,10 +51,46 @@ import org.apache.hadoop.conf.*;
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
+ 
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
+
+  
   //writableRpcVersion should be updated if there is a change
   //in format of the rpc messages.
-  public static long writableRpcVersion = 1L;
+  
+  // 2L - added declared class to Invocation
+  public static final long writableRpcVersion = 2L; 
 
+  
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
     private String methodName;
@@ -59,11 +99,13 @@ public class WritableRpcEngine implement
     private Configuration conf;
     private long clientVersion;
     private int clientMethodsHash;
+    private String declaringClassProtocolName;
     
     //This could be different from static writableRpcVersion when received
     //at server, if client is using a different version.
     private long rpcVersion;
 
+    @SuppressWarnings("unused") // called when deserializing an invocation
     public Invocation() {}
 
     public Invocation(Method method, Object[] parameters) {
@@ -88,6 +130,8 @@ public class WritableRpcEngine implement
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
+      this.declaringClassProtocolName = 
+          RPC.getProtocolName(method.getDeclaringClass());
     }
 
     /** The name of the method invoked. */
@@ -103,6 +147,7 @@ public class WritableRpcEngine implement
       return clientVersion;
     }
 
+    @SuppressWarnings("unused")
     private int getClientMethodsHash() {
       return clientMethodsHash;
     }
@@ -115,8 +160,10 @@ public class WritableRpcEngine implement
       return rpcVersion;
     }
 
+    @SuppressWarnings("deprecation")
     public void readFields(DataInput in) throws IOException {
       rpcVersion = in.readLong();
+      declaringClassProtocolName = UTF8.readString(in);
       methodName = UTF8.readString(in);
       clientVersion = in.readLong();
       clientMethodsHash = in.readInt();
@@ -124,13 +171,16 @@ public class WritableRpcEngine implement
       parameterClasses = new Class[parameters.length];
       ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+        parameters[i] = 
+            ObjectWritable.readObject(in, objectWritable, this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
+    @SuppressWarnings("deprecation")
     public void write(DataOutput out) throws IOException {
       out.writeLong(rpcVersion);
+      UTF8.writeString(out, declaringClassProtocolName);
       UTF8.writeString(out, methodName);
       out.writeLong(clientVersion);
       out.writeInt(clientMethodsHash);
@@ -273,30 +323,161 @@ public class WritableRpcEngine implement
 
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
-  public Server getServer(Class<?> protocol,
-                          Object instance, String bindAddress, int port,
-                          int numHandlers, int numReaders, int queueSizePerHandler,
-                          boolean verbose, Configuration conf,
+  public RPC.Server getServer(Class<?> protocolClass,
+                      Object protocolImpl, String bindAddress, int port,
+                      int numHandlers, int numReaders, int queueSizePerHandler,
+                      boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers, 
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
 
+
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private Object instance;
     private boolean verbose;
+    
+    /**
+     *  The key in Map
+     */
+    static class ProtoNameVer {
+      final String protocol;
+      final long   version;
+      ProtoNameVer(String protocol, long ver) {
+        this.protocol = protocol;
+        this.version = ver;
+      }
+      @Override
+      public boolean equals(Object o) {
+        if (o == null) 
+          return false;
+        if (this == o) 
+          return true;
+        if (! (o instanceof ProtoNameVer))
+          return false;
+        ProtoNameVer pv = (ProtoNameVer) o;
+        return ((pv.protocol.equals(this.protocol)) && 
+            (pv.version == this.version));     
+      }
+      @Override
+      public int hashCode() {
+        return protocol.hashCode() * 37 + (int) version;    
+      }
+    }
+    
+    /**
+     * The value in map
+     */
+    static class ProtoClassProtoImpl {
+      final Class<?> protocolClass;
+      final Object protocolImpl; 
+      ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+        this.protocolClass = protocolClass;
+        this.protocolImpl = protocolImpl;
+      }
+    }
+    
+    private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap = 
+        new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
+    
+    // Register  protocol and its impl for rpc calls
+    private void registerProtocolAndImpl(Class<?> protocolClass, 
+        Object protocolImpl) throws IOException {
+      String protocolName = RPC.getProtocolName(protocolClass);
+      VersionedProtocol vp = (VersionedProtocol) protocolImpl;
+      long version;
+      try {
+        version = vp.getProtocolVersion(protocolName, 0);
+      } catch (Exception ex) {
+        LOG.warn("Protocol "  + protocolClass + 
+             " NOT registered as getProtocolVersion throws exception ");
+        return;
+      }
+      protocolImplMap.put(new ProtoNameVer(protocolName, version),
+          new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+      LOG.info("ProtocolImpl=" + protocolImpl.getClass().getName() + 
+          " protocolClass=" + protocolClass.getName() + " version=" + version);
+    }
+    
+    private static class VerProtocolImpl {
+      final long version;
+      final ProtoClassProtoImpl protocolTarget;
+      VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+        this.version = ver;
+        this.protocolTarget = protocolTarget;
+      }
+    }
+    
+    
+    @SuppressWarnings("unused") // will be useful later.
+    private VerProtocolImpl[] getSupportedProtocolVersions(
+        String protocolName) {
+      VerProtocolImpl[] resultk = new  VerProtocolImpl[protocolImplMap.size()];
+      int i = 0;
+      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                        protocolImplMap.entrySet()) {
+        if (pv.getKey().protocol.equals(protocolName)) {
+          resultk[i++] = 
+              new VerProtocolImpl(pv.getKey().version, pv.getValue());
+        }
+      }
+      if (i == 0) {
+        return null;
+      }
+      VerProtocolImpl[] result = new VerProtocolImpl[i];
+      System.arraycopy(resultk, 0, result, 0, i);
+      return result;
+    }
+    
+    private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {    
+      Long highestVersion = 0L;
+      ProtoClassProtoImpl highest = null;
+      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
+          .entrySet()) {
+        if (pv.getKey().protocol.equals(protocolName)) {
+          if ((highest == null) || (pv.getKey().version > highestVersion)) {
+            highest = pv.getValue();
+            highestVersion = pv.getKey().version;
+          } 
+        }
+      }
+      if (highest == null) {
+        return null;
+      }
+      return new VerProtocolImpl(highestVersion,  highest);   
+    }
+ 
 
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
+     * 
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)
+     *    
+     */
+    @Deprecated
+    public Server(Object instance, Configuration conf, String bindAddress,
+        int port) 
+      throws IOException {
+      this(null, instance, conf,  bindAddress, port);
+    }
+    
+    
+    /** Construct an RPC server.
+     * @param protocol class
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
      */
-    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+    public Server(Class<?> protocolClass, Object protocolImpl, 
+        Configuration conf, String bindAddress, int port) 
       throws IOException {
-      this(instance, conf,  bindAddress, port, 1, -1, -1, false, null);
+      this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
+          false, null);
     }
     
     private static String classNameBase(String className) {
@@ -307,35 +488,103 @@ public class WritableRpcEngine implement
       return names[names.length-1];
     }
     
+    
     /** Construct an RPC server.
-     * @param instance the instance whose methods will be called
+     * @param protocolImpl the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * 
+     * @deprecated use Server#Server(Class, Object, 
+     *      Configuration, String, int, int, int, int, boolean, SecretManager)
+     */
+    @Deprecated
+    public Server(Object protocolImpl, Configuration conf, String bindAddress,
+        int port, int numHandlers, int numReaders, int queueSizePerHandler,
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
+            throws IOException {
+       this(null, protocolImpl,  conf,  bindAddress,   port,
+                   numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
+                   secretManager);
+   
+    }
+    
+    /** Construct an RPC server.
+     * @param protocolClass - the protocol being registered
+     *     can be null for compatibility with old usage (see below for details)
+     * @param protocolImpl the protocol impl that will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,  int port,
-                  int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, 
-                  SecretManager<? extends TokenIdentifier> secretManager) 
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int numReaders, int queueSizePerHandler, 
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, numReaders,
           queueSizePerHandler, conf,
-          classNameBase(instance.getClass().getName()), secretManager);
-      this.instance = instance;
+          classNameBase(protocolImpl.getClass().getName()), secretManager);
+
       this.verbose = verbose;
+      
+      
+      Class<?>[] protocols;
+      if (protocolClass == null) { // derive protocol from impl
+        /*
+         * In order to remain compatible with the old usage where a single
+         * target protocolImpl is suppled for all protocol interfaces, and
+         * the protocolImpl is derived from the protocolClass(es) 
+         * we register all interfaces extended by the protocolImpl
+         */
+        protocols = getProtocolInterfaces(protocolImpl.getClass());
+
+      } else {
+        if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
+          throw new IOException("protocolClass "+ protocolClass +
+              " is not implemented by protocolImpl which is of class " +
+              protocolImpl.getClass());
+        }
+        // register protocol class and its super interfaces
+        registerProtocolAndImpl(protocolClass, protocolImpl);
+        protocols = getProtocolInterfaces(protocolClass);
+      }
+      for (Class<?> p : protocols) {
+        if (!p.equals(VersionedProtocol.class)) {
+          registerProtocolAndImpl(p, protocolImpl);
+        }
+      }
+
     }
 
-    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+ 
+    @Override
+    public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
+      addProtocol(
+        Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
+      registerProtocolAndImpl(protocolClass, protocolImpl);
+      return this;
+    }
+    
+    /**
+     * Process a client call
+     * @param protocolName - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param param  parameters
+     * @param receivedTime time at which the call receoved (for metrics)
+     * @return the call's return
+     * @throws IOException
+     */
+    public Writable call(String protocolName, Writable param, long receivedTime) 
     throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
 
-        Method method = protocol.getMethod(call.getMethodName(),
-                                           call.getParameterClasses());
-        method.setAccessible(true);
-
         // Verify rpc version
         if (call.getRpcVersion() != writableRpcVersion) {
           // Client is using a different version of WritableRpc
@@ -344,25 +593,51 @@ public class WritableRpcEngine implement
                   + call.getRpcVersion() + ", server side version="
                   + writableRpcVersion);
         }
-        
-        //Verify protocol version.
-        //Bypass the version check for VersionedProtocol
-        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
-          long clientVersion = call.getProtocolVersion();
-          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
-              .getProtocolSignature(protocol.getCanonicalName(), call
-                  .getProtocolVersion(), call.getClientMethodsHash());
-          long serverVersion = serverInfo.getVersion();
-          if (serverVersion != clientVersion) {
-            LOG.warn("Version mismatch: client version=" + clientVersion
-                + ", server version=" + serverVersion);
-            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
-                serverVersion);
+
+        long clientVersion = call.getProtocolVersion();
+        final String protoName;
+        ProtoClassProtoImpl protocolImpl;
+        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+          // VersionProtocol methods are often used by client to figure out
+          // which version of protocol to use.
+          //
+          // Versioned protocol methods should go the protocolName protocol
+          // rather than the declaring class of the method since the
+          // the declaring class is VersionedProtocol which is not 
+          // registered directly.
+          // Send the call to the highest  protocol version
+          protocolImpl = 
+              getHighestSupportedProtocol(protocolName).protocolTarget;
+        } else {
+          protoName = call.declaringClassProtocolName;
+
+          // Find the right impl for the protocol based on client version.
+          ProtoNameVer pv = 
+              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+          protocolImpl = protocolImplMap.get(pv);
+          if (protocolImpl == null) { // no match for Protocol AND Version
+             VerProtocolImpl highest = 
+                 getHighestSupportedProtocol(protoName);
+            if (highest == null) {
+              throw new IOException("Unknown protocol: " + protoName);
+            } else { // protocol supported but not the version that client wants
+              throw new RPC.VersionMismatch(protoName, clientVersion,
+                highest.version);
+            }
           }
         }
+        
+
+        // Invoke the protocol method
 
         long startTime = System.currentTimeMillis();
-        Object value = method.invoke(instance, call.getParameters());
+        Method method = 
+            protocolImpl.protocolClass.getMethod(call.getMethodName(),
+            call.getParameterClasses());
+        method.setAccessible(true);
+        rpcDetailedMetrics.init(protocolImpl.protocolClass);
+        Object value = 
+            method.invoke(protocolImpl.protocolImpl, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime-receivedTime);
         if (LOG.isDebugEnabled()) {

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sat Sep  3 00:31:05 2011
@@ -97,7 +97,7 @@ public class TestIPC {
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+    public Writable call(String protocol, Writable param, long receiveTime)
         throws IOException {
       if (sleep) {
         // sleep a bit

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java Sat Sep  3 00:31:05 2011
@@ -72,7 +72,7 @@ public class TestIPCServerResponder exte
     }
 
     @Override
-    public Writable call(Class<?> protocol, Writable param, long receiveTime)
+    public Writable call(String protocol, Writable param, long receiveTime)
         throws IOException {
       if (sleep) {
         try {

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java?rev=1164771&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java Sat Sep  3 00:31:05 2011
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestMultipleProtocolServer {
+  private static final String ADDRESS = "0.0.0.0";
+  private static InetSocketAddress addr;
+  private static RPC.Server server;
+
+  private static Configuration conf = new Configuration();
+  
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface Foo0 extends VersionedProtocol {
+    public static final long versionID = 0L;
+    String ping() throws IOException;
+    
+  }
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface Foo1 extends VersionedProtocol {
+    public static final long versionID = 1L;
+    String ping() throws IOException;
+    String ping2() throws IOException;
+  }
+  
+  @ProtocolInfo(protocolName="Foo")
+  interface FooUnimplemented extends VersionedProtocol {
+    public static final long versionID = 2L;
+    String ping() throws IOException;  
+  }
+  
+  interface Mixin extends VersionedProtocol{
+    public static final long versionID = 0L;
+    void hello() throws IOException;
+  }
+  interface Bar extends Mixin, VersionedProtocol {
+    public static final long versionID = 0L;
+    int echo(int i) throws IOException;
+  }
+  
+  
+  
+  class Foo0Impl implements Foo0 {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Foo0.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                          getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public String ping() {
+      return "Foo0";     
+    }
+    
+  }
+  
+  class Foo1Impl implements Foo1 {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Foo1.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                        getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public String ping() {
+      return "Foo1";
+    }
+
+    @Override
+    public String ping2() {
+      return "Foo1";
+      
+    }
+    
+  }
+
+  
+  class BarImpl implements Bar {
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return Bar.versionID;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      Class<? extends VersionedProtocol> inter;
+      try {
+        inter = (Class<? extends VersionedProtocol>)getClass().
+                                          getGenericInterfaces()[0];
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      return ProtocolSignature.getProtocolSignature(clientMethodsHash, 
+          getProtocolVersion(protocol, clientVersion), inter);
+    }
+
+    @Override
+    public int echo(int i) {
+      return i;
+    }
+
+    @Override
+    public void hello() {
+
+      
+    }
+  }
+  @Before
+  public void setUp() throws Exception {
+    // create a server with two handlers
+    server = RPC.getServer(Foo0.class,
+                              new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(Foo1.class, new Foo1Impl());
+    server.addProtocol(Bar.class, new BarImpl());
+    server.addProtocol(Mixin.class, new BarImpl());
+    server.start();
+    addr = NetUtils.getConnectAddress(server);
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
+  }
+
+  @Test
+  public void test1() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(Foo0.class, Foo0.versionID, addr, conf);
+
+    Foo0 foo0 = (Foo0)proxy.getProxy(); 
+    Assert.assertEquals("Foo0", foo0.ping());
+    
+    
+    proxy = RPC.getProtocolProxy(Foo1.class, Foo1.versionID, addr, conf);
+    
+    
+    Foo1 foo1 = (Foo1)proxy.getProxy(); 
+    Assert.assertEquals("Foo1", foo1.ping());
+    Assert.assertEquals("Foo1", foo1.ping());
+    
+    
+    proxy = RPC.getProtocolProxy(Bar.class, Foo1.versionID, addr, conf);
+    
+    
+    Bar bar = (Bar)proxy.getProxy(); 
+    Assert.assertEquals(99, bar.echo(99));
+    
+    // Now test Mixin class method
+    
+    Mixin mixin = bar;
+    mixin.hello();
+  }
+  
+  
+  // Server does not implement the FooUnimplemented version of protocol Foo.
+  // See that calls to it fail.
+  @Test(expected=IOException.class)
+  public void testNonExistingProtocol() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(FooUnimplemented.class, 
+        FooUnimplemented.versionID, addr, conf);
+
+    FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 
+    foo.ping();
+  }
+  
+  
+  /**
+   * getProtocolVersion of an unimplemented version should return highest version
+   * Similarly getProtocolSignature should work.
+   * @throws IOException
+   */
+  @Test
+  public void testNonExistingProtocol2() throws IOException {
+    ProtocolProxy<?> proxy;
+    proxy = RPC.getProtocolProxy(FooUnimplemented.class, 
+        FooUnimplemented.versionID, addr, conf);
+
+    FooUnimplemented foo = (FooUnimplemented)proxy.getProxy(); 
+    Assert.assertEquals(Foo1.versionID, 
+        foo.getProtocolVersion(RPC.getProtocolName(FooUnimplemented.class), 
+        FooUnimplemented.versionID));
+    foo.getProtocolSignature(RPC.getProtocolName(FooUnimplemented.class), 
+        FooUnimplemented.versionID, 0);
+  }
+  
+  @Test(expected=IOException.class)
+  public void testIncorrectServerCreation() throws IOException {
+    RPC.getServer(Foo1.class,
+        new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
+  }
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1164771&r1=1164770&r2=1164771&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java Sat Sep  3 00:31:05 2011
@@ -39,7 +39,7 @@ import org.junit.Test;
 public class TestRPCCompatibility {
   private static final String ADDRESS = "0.0.0.0";
   private static InetSocketAddress addr;
-  private static Server server;
+  private static RPC.Server server;
   private ProtocolProxy<?> proxy;
 
   public static final Log LOG =
@@ -52,10 +52,12 @@ public class TestRPCCompatibility {
     void ping() throws IOException;    
   }
   
-  public interface TestProtocol1 extends TestProtocol0 {
+  public interface TestProtocol1 extends VersionedProtocol, TestProtocol0 {
     String echo(String value) throws IOException;
   }
 
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol2 extends TestProtocol1 {
     int echo(int value)  throws IOException;
   }
@@ -89,11 +91,23 @@ public class TestRPCCompatibility {
   public static class TestImpl1 extends TestImpl0 implements TestProtocol1 {
     @Override
     public String echo(String value) { return value; }
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+        return TestProtocol1.versionID;
+    }
   }
 
   public static class TestImpl2 extends TestImpl1 implements TestProtocol2 {
     @Override
     public int echo(int value) { return value; }
+
+    @Override
+    public long getProtocolVersion(String protocol,
+        long clientVersion) throws IOException {
+      return TestProtocol2.versionID;
+    }
+
   }
   
   @After
@@ -109,8 +123,10 @@ public class TestRPCCompatibility {
   @Test  // old client vs new server
   public void testVersion0ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                            impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -172,8 +188,10 @@ public class TestRPCCompatibility {
   @Test // Compatible new client & old server
   public void testVersion2ClientVersion1Server() throws Exception {
     // create a server with two handlers
+    TestImpl1 impl = new TestImpl1();
     server = RPC.getServer(TestProtocol1.class,
-                              new TestImpl1(), ADDRESS, 0, 2, false, conf, null);
+                              impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -190,8 +208,10 @@ public class TestRPCCompatibility {
   @Test // equal version client and server
   public void testVersion2ClientVersion2Server() throws Exception {
     // create a server with two handlers
+    TestImpl2 impl = new TestImpl2();
     server = RPC.getServer(TestProtocol2.class,
-                              new TestImpl2(), ADDRESS, 0, 2, false, conf, null);
+                             impl, ADDRESS, 0, 2, false, conf, null);
+    server.addProtocol(TestProtocol0.class, impl);
     server.start();
     addr = NetUtils.getConnectAddress(server);
 
@@ -250,14 +270,16 @@ public class TestRPCCompatibility {
     assertEquals(hash1, hash2);
   }
   
+  @ProtocolInfo(protocolName=
+      "org.apache.hadoop.ipc.TestRPCCompatibility$TestProtocol1")
   public interface TestProtocol4 extends TestProtocol2 {
-    public static final long versionID = 1L;
+    public static final long versionID = 4L;
     int echo(int value)  throws IOException;
   }
   
   @Test
   public void testVersionMismatch() throws IOException {
-    server = RPC.getServer(TestProtocol2.class, new TestImpl0(), ADDRESS, 0, 2,
+    server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
         false, conf, null);
     server.start();
     addr = NetUtils.getConnectAddress(server);
@@ -268,7 +290,8 @@ public class TestRPCCompatibility {
       proxy.echo(21);
       fail("The call must throw VersionMismatch exception");
     } catch (IOException ex) {
-      Assert.assertTrue(ex.getMessage().contains("VersionMismatch"));
+      Assert.assertTrue("Expected version mismatch but got " + ex.getMessage(), 
+          ex.getMessage().contains("VersionMismatch"));
     }
   }
 }
\ No newline at end of file