You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sr...@apache.org on 2011/12/04 21:44:38 UTC

svn commit: r1210208 [1/2] - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/ipc/protobuf/ src/proto/ src/test/java/org/apache/hadoop/ipc/ src/test/java/org/apache/had...

Author: sradia
Date: Sun Dec  4 20:44:36 2011
New Revision: 1210208

URL: http://svn.apache.org/viewvc?rev=1210208&view=rev
Log:
HADOOP-7862  Move the support for multiple protocols to lower layer so that Writable, PB and Avro can all use it (includes HDFS and MR changes to match) (Sanjay) 

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/protobuf/TestRpcServiceProtos.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Sun Dec  4 20:44:36 2011
@@ -61,6 +61,9 @@ Trunk (unreleased changes)
 
     HADOOP-7590. Mavenize streaming and MR examples. (tucu)
 
+		HADOOP-7862  Move the support for multiple protocols to lower layer so that Writable,
+								 PB and Avro can all use it (Sanjay)
+
   BUGS
 
     HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Sun Dec  4 20:44:36 2011
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -237,14 +238,15 @@ public class AvroRpcEngine implements Rp
       super((Class)null, new Object(), conf,
             bindAddress, port, numHandlers, numReaders,
             queueSizePerHandler, verbose, secretManager);
-      super.addProtocol(TunnelProtocol.class, responder);
+      // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
+      super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
       responder.addProtocol(iface, impl);
     }
 
 
     @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+    public Server
+      addProtocol(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl)
         throws IOException {
       responder.addProtocol(protocolClass, protocolImpl);
       return this;

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Sun Dec  4 20:44:36 2011
@@ -1002,17 +1002,19 @@ public class Client {
   }
 
   /**
-   * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   *  for RPC_BUILTIN
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
-    return call(RpcKind.RPC_WRITABLE, param, address);
+    return call(RpcKind.RPC_BUILTIN, param, address);
     
   }
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
@@ -1025,7 +1027,8 @@ public class Client {
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, 
+   * ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1042,7 +1045,8 @@ public class Client {
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1056,7 +1060,7 @@ public class Client {
 
   
   /**
-   * Same as {@link #call(RpcKind, Writable, InetSocketAddress, 
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, 
    * Class, UserGroupInformation, int, Configuration)}
    * except that rpcKind is writable.
    */
@@ -1066,7 +1070,7 @@ public class Client {
       throws InterruptedException, IOException {
         ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-        return call(RpcKind.RPC_WRITABLE, param, remoteId);
+    return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
   /**
@@ -1087,21 +1091,28 @@ public class Client {
   }
   
   /**
-   * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
-   * except the rpcKind is RPC_WRITABLE
+   * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   * except the rpcKind is RPC_BUILTIN
    */
   public Writable call(Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
-     return call(RpcKind.RPC_WRITABLE, param, remoteId);
+     return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
-  /** Make a call, passing <code>param</code>, to the IPC server defined by
-   * <code>remoteId</code>, returning the value.  
+  /** 
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc respond.
+   * 
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @returns the rpc response
    * Throws exceptions if there are network problems or if the remote code 
-   * threw an exception. */
-  public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)  
-      throws InterruptedException, IOException {
-    Call call = new Call(rpcKind, param);
+   * threw an exception.
+   */
+  public Writable call(RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId) throws InterruptedException, IOException {
+    Call call = new Call(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Sun Dec  4 20:44:36 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
@@ -60,6 +61,12 @@ import com.google.protobuf.ServiceExcept
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+  
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(
+        RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+        new Server.ProtoBufRpcInvoker());
+  }
 
   private static final ClientCache CLIENTS = new ClientCache();
 
@@ -75,10 +82,13 @@ public class ProtobufRpcEngine implement
   }
 
   private static class Invoker implements InvocationHandler, Closeable {
-    private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+    private final Map<String, Message> returnTypes = 
+        new ConcurrentHashMap<String, Message>();
     private boolean isClosed = false;
-    private Client.ConnectionId remoteId;
-    private Client client;
+    private final Client.ConnectionId remoteId;
+    private final Client client;
+    private final long clientProtocolVersion;
+    private final String protocolName;
 
     public Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
@@ -87,6 +97,8 @@ public class ProtobufRpcEngine implement
           ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory,
           RpcResponseWritable.class);
+      this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
+      this.protocolName = RPC.getProtocolName(protocol);
     }
 
     private HadoopRpcRequestProto constructRpcRequest(Method method,
@@ -108,6 +120,19 @@ public class ProtobufRpcEngine implement
 
       Message param = (Message) params[1];
       builder.setRequest(param.toByteString());
+      // For protobuf, {@code protocol} used when creating client side proxy is
+      // the interface extending BlockingInterface, which has the annotations 
+      // such as ProtocolName etc.
+      //
+      // Using Method.getDeclaringClass(), as in WritableEngine to get at
+      // the protocol interface will return BlockingInterface, from where 
+      // the annotation ProtocolName and Version cannot be
+      // obtained.
+      //
+      // Hence we simply use the protocol class used to create the proxy.
+      // For PB this may limit the use of mixins on client side.
+      builder.setDeclaringClassProtocolName(protocolName);
+      builder.setClientProtocolVersion(clientProtocolVersion);
       rpcRequest = builder.build();
       return rpcRequest;
     }
@@ -272,15 +297,16 @@ public class ProtobufRpcEngine implement
         RpcResponseWritable.class);
   }
   
+ 
 
   @Override
-  public RPC.Server getServer(Class<?> protocol, Object instance,
+  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
       String bindAddress, int port, int numHandlers, int numReaders,
       int queueSizePerHandler, boolean verbose, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager)
       throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers,
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocol, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
   
   private static RemoteException getRemoteException(Exception e) {
@@ -289,87 +315,31 @@ public class ProtobufRpcEngine implement
   }
 
   public static class Server extends RPC.Server {
-    private BlockingService service;
-    private boolean verbose;
-
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length - 1];
-    }
-
     /**
      * Construct an RPC server.
      * 
-     * @param instance the instance whose methods will be called
+     * @param protocolClass the class of protocol
+     * @param protocolImpl the protocolImpl whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,
-        int port, int numHandlers, int numReaders, int queueSizePerHandler,
-        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager)
         throws IOException {
       super(bindAddress, port, RpcRequestWritable.class, numHandlers,
-          numReaders, queueSizePerHandler, conf, classNameBase(instance
+          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
               .getClass().getName()), secretManager);
-      this.service = (BlockingService) instance;
-      this.verbose = verbose;
+      this.verbose = verbose;  
+      registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, 
+          protocolClass, protocolImpl);
     }
 
-    /**
-     * This is a server side method, which is invoked over RPC. On success
-     * the return response has protobuf response payload. On failure, the
-     * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
-     * 
-     * In this method there three types of exceptions possible and they are
-     * returned in response as follows.
-     * <ol>
-     * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
-     * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
-     * this method returns in response the exception thrown by the service.</li>
-     * <li> Other exceptions thrown by the service. They are returned as
-     * it is.</li>
-     * </ol>
-     */
-    @Override
-    public Writable call(String protocol, Writable writableRequest,
-        long receiveTime) throws IOException {
-      RpcRequestWritable request = (RpcRequestWritable) writableRequest;
-      HadoopRpcRequestProto rpcRequest = request.message;
-      String methodName = rpcRequest.getMethodName();
-      if (verbose)
-        LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
-      MethodDescriptor methodDescriptor = service.getDescriptorForType()
-          .findMethodByName(methodName);
-      if (methodDescriptor == null) {
-        String msg = "Unknown method " + methodName + " called on " + protocol
-            + " protocol.";
-        LOG.warn(msg);
-        return handleException(new RpcServerException(msg));
-      }
-      Message prototype = service.getRequestPrototype(methodDescriptor);
-      Message param = prototype.newBuilderForType()
-          .mergeFrom(rpcRequest.getRequest()).build();
-      Message result;
-      try {
-        result = service.callBlockingMethod(methodDescriptor, null, param);
-      } catch (ServiceException e) {
-        Throwable cause = e.getCause();
-        return handleException(cause != null ? cause : e);
-      } catch (Exception e) {
-        return handleException(e);
-      }
-
-      HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
-      return new RpcResponseWritable(response);
-    }
-
-    private RpcResponseWritable handleException(Throwable e) {
+    private static RpcResponseWritable handleException(Throwable e) {
       HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
           .setExceptionName(e.getClass().getName())
           .setStackTrace(StringUtils.stringifyException(e)).build();
@@ -378,7 +348,7 @@ public class ProtobufRpcEngine implement
       return new RpcResponseWritable(response);
     }
 
-    private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+    private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
         Message message) {
       HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
           .setResponse(message.toByteString())
@@ -386,5 +356,81 @@ public class ProtobufRpcEngine implement
           .build();
       return res;
     }
+    
+    /**
+     * Protobuf invoker for {@link RpcInvoker}
+     */
+    static class ProtoBufRpcInvoker implements RpcInvoker {
+
+      @Override 
+      /**
+       * This is a server side method, which is invoked over RPC. On success
+       * the return response has protobuf response payload. On failure, the
+       * exception name and the stack trace are return in the resposne.
+       * See {@link HadoopRpcResponseProto}
+       * 
+       * In this method there three types of exceptions possible and they are
+       * returned in response as follows.
+       * <ol>
+       * <li> Exceptions encountered in this method that are returned 
+       * as {@link RpcServerException} </li>
+       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
+       * In that this method returns in response the exception thrown by the 
+       * service.</li>
+       * <li> Other exceptions thrown by the service. They are returned as
+       * it is.</li>
+       * </ol>
+       */
+      public Writable call(RPC.Server server, String protocol,
+          Writable writableRequest, long receiveTime) throws IOException {
+        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+        HadoopRpcRequestProto rpcRequest = request.message;
+        String methodName = rpcRequest.getMethodName();
+        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        long clientVersion = rpcRequest.getClientProtocolVersion();
+        if (server.verbose)
+          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+        
+        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+        ProtoClassProtoImpl protocolImpl = 
+            server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+        if (protocolImpl == null) { // no match for Protocol AND Version
+          VerProtocolImpl highest = 
+              server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, 
+                  protoName);
+          if (highest == null) {
+            throw new IOException("Unknown protocol: " + protoName);
+          }
+          // protocol supported but not the version that client wants
+          throw new RPC.VersionMismatch(protoName, clientVersion,
+              highest.version);
+        }
+        
+        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+        MethodDescriptor methodDescriptor = service.getDescriptorForType()
+            .findMethodByName(methodName);
+        if (methodDescriptor == null) {
+          String msg = "Unknown method " + methodName + " called on " + protocol
+              + " protocol.";
+          LOG.warn(msg);
+          return handleException(new RpcServerException(msg));
+        }
+        Message prototype = service.getRequestPrototype(methodDescriptor);
+        Message param = prototype.newBuilderForType()
+            .mergeFrom(rpcRequest.getRequest()).build();
+        Message result;
+        try {
+          result = service.callBlockingMethod(methodDescriptor, null, param);
+        } catch (ServiceException e) {
+          Throwable cause = e.getCause();
+          return handleException(cause != null ? cause : e);
+        } catch (Exception e) {
+          return handleException(e);
+        }
+  
+        HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+        return new RpcResponseWritable(response);
+      }
+    }
   }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Sun Dec  4 20:44:36 2011
@@ -35,4 +35,5 @@ import java.lang.annotation.RetentionPol
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ProtocolInfo {
   String protocolName();  // the name of the protocol (i.e. rpc service)
+  long protocolVersion() default -1; // default means not defined use old way
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Sun Dec  4 20:44:36 2011
@@ -57,19 +57,11 @@ public class ProtocolProxy<T> {
   
   private void fetchServerMethods(Method method) throws IOException {
     long clientVersion;
-    try {
-      Field versionField = method.getDeclaringClass().getField("versionID");
-      versionField.setAccessible(true);
-      clientVersion = versionField.getLong(method.getDeclaringClass());
-    } catch (NoSuchFieldException ex) {
-      throw new RuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      throw new RuntimeException(ex);
-    }
+    clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
     int clientMethodsHash = ProtocolSignature.getFingerprint(method
         .getDeclaringClass().getMethods());
     ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
-        .getProtocolSignature(protocol.getName(), clientVersion,
+        .getProtocolSignature(RPC.getProtocolName(protocol), clientVersion,
             clientMethodsHash);
     long serverVersion = serverInfo.getVersion();
     if (serverVersion != clientVersion) {

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Sun Dec  4 20:44:36 2011
@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ProtocolSignature implements Writable {
   static {               // register a ctor
     WritableFactories.setFactory
@@ -164,10 +166,15 @@ public class ProtocolSignature implement
   /**
    * A cache that maps a protocol's name to its signature & finger print
    */
-  final private static HashMap<String, ProtocolSigFingerprint> 
+  private final static HashMap<String, ProtocolSigFingerprint> 
      PROTOCOL_FINGERPRINT_CACHE = 
        new HashMap<String, ProtocolSigFingerprint>();
   
+  @VisibleForTesting
+  public static void resetCache() {
+    PROTOCOL_FINGERPRINT_CACHE.clear();
+  }
+  
   /**
    * Return a protocol's signature and finger print from cache
    * 
@@ -177,7 +184,7 @@ public class ProtocolSignature implement
    */
   private static ProtocolSigFingerprint getSigFingerprint(
       Class <? extends VersionedProtocol> protocol, long serverVersion) {
-    String protocolName = protocol.getName();
+    String protocolName = RPC.getProtocolName(protocol);
     synchronized (PROTOCOL_FINGERPRINT_CACHE) {
       ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
       if (sig == null) {

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sun Dec  4 20:44:36 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
@@ -28,6 +29,9 @@ import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
 import java.io.*;
 import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -36,6 +40,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,8 +68,54 @@ import org.apache.hadoop.util.Reflection
  * the protocol instance is transmitted.
  */
 public class RPC {
+  
+  interface RpcInvoker {   
+    /**
+     * Process a client call on the server side
+     * @param server the server within whose context this rpc call is made
+     * @param protocol - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param rpcRequest  - deserialized
+     * @param receiveTime time at which the call received (for metrics)
+     * @return the call's return
+     * @throws IOException
+     **/
+    public Writable call(Server server, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException ;
+  }
+  
   static final Log LOG = LogFactory.getLog(RPC.class);
   
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
   
   /**
    * Get the protocol name.
@@ -75,9 +126,36 @@ public class RPC {
     if (protocol == null) {
       return null;
     }
-    ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
     return  (anno == null) ? protocol.getName() : anno.protocolName();
   }
+  
+  /**
+   * Get the protocol version from protocol class.
+   * If the protocol class has a ProtocolAnnotation, then get the protocol
+   * name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public long getProtocolVersion(Class<?> protocol) {
+    if (protocol == null) {
+      throw new IllegalArgumentException("Null protocol");
+    }
+    long version;
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+    if (anno != null) {
+      version = anno.protocolVersion();
+      if (version != -1)
+        return version;
+    }
+    try {
+      Field versionField = protocol.getField("versionID");
+      versionField.setAccessible(true);
+      return versionField.getLong(protocol);
+    } catch (NoSuchFieldException ex) {
+      throw new RuntimeException(ex);
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
   private RPC() {}                                  // no public ctor
 
@@ -590,6 +668,144 @@ public class RPC {
 
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
+   boolean verbose;
+   static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+   
+   /**
+    * Store a map of protocol and version to its implementation
+    */
+   /**
+    *  The key in Map
+    */
+   static class ProtoNameVer {
+     final String protocol;
+     final long   version;
+     ProtoNameVer(String protocol, long ver) {
+       this.protocol = protocol;
+       this.version = ver;
+     }
+     @Override
+     public boolean equals(Object o) {
+       if (o == null) 
+         return false;
+       if (this == o) 
+         return true;
+       if (! (o instanceof ProtoNameVer))
+         return false;
+       ProtoNameVer pv = (ProtoNameVer) o;
+       return ((pv.protocol.equals(this.protocol)) && 
+           (pv.version == this.version));     
+     }
+     @Override
+     public int hashCode() {
+       return protocol.hashCode() * 37 + (int) version;    
+     }
+   }
+   
+   /**
+    * The value in map
+    */
+   static class ProtoClassProtoImpl {
+     final Class<?> protocolClass;
+     final Object protocolImpl; 
+     ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+       this.protocolClass = protocolClass;
+       this.protocolImpl = protocolImpl;
+     }
+   }
+
+   ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = 
+       new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+   
+   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+     if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+       for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+         protocolImplMapArray.add(
+             new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+       }
+     }
+     return protocolImplMapArray.get(rpcKind.ordinal());   
+   }
+   
+   // Register  protocol and its impl for rpc calls
+   void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
+       Object protocolImpl) throws IOException {
+     String protocolName = RPC.getProtocolName(protocolClass);
+     long version;
+     
+
+     try {
+       version = RPC.getProtocolVersion(protocolClass);
+     } catch (Exception ex) {
+       LOG.warn("Protocol "  + protocolClass + 
+            " NOT registered as cannot get protocol version ");
+       return;
+     }
+
+
+     getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+         new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+     LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName +  " version=" + version +
+         " ProtocolImpl=" + protocolImpl.getClass().getName() + 
+         " protocolClass=" + protocolClass.getName());
+   }
+   
+   static class VerProtocolImpl {
+     final long version;
+     final ProtoClassProtoImpl protocolTarget;
+     VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+       this.version = ver;
+       this.protocolTarget = protocolTarget;
+     }
+   }
+   
+   
+   @SuppressWarnings("unused") // will be useful later.
+   VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+       String protocolName) {
+     VerProtocolImpl[] resultk = 
+         new  VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+     int i = 0;
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                       getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         resultk[i++] = 
+             new VerProtocolImpl(pv.getKey().version, pv.getValue());
+       }
+     }
+     if (i == 0) {
+       return null;
+     }
+     VerProtocolImpl[] result = new VerProtocolImpl[i];
+     System.arraycopy(resultk, 0, result, 0, i);
+     return result;
+   }
+   
+   VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, 
+       String protocolName) {    
+     Long highestVersion = 0L;
+     ProtoClassProtoImpl highest = null;
+ System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : 
+           getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         if ((highest == null) || (pv.getKey().version > highestVersion)) {
+           highest = pv.getValue();
+           highestVersion = pv.getKey().version;
+         } 
+       }
+     }
+     if (highest == null) {
+       return null;
+     }
+     return new VerProtocolImpl(highestVersion,  highest);   
+   }
   
     protected Server(String bindAddress, int port, 
                      Class<? extends Writable> paramClass, int handlerCount,
@@ -606,11 +822,17 @@ public class RPC {
      * @param protocolImpl - the impl of the protocol that will be called
      * @return the server (for convenience)
      */
-    public <PROTO, IMPL extends PROTO>
-      Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
-    ) throws IOException {
-      throw new IOException("addProtocol Not Implemented");
+    public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+        Object protocolImpl) throws IOException {
+      registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+      return this;
+    }
+    
+    @Override
+    public Writable call(RpcKind rpcKind, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException {
+      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+          receiveTime);
     }
   }
-
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java Sun Dec  4 20:44:36 2011
@@ -54,13 +54,14 @@ public class RpcPayloadHeader implements
   }
   
   public enum RpcKind {
-    RPC_BUILTIN ((short ) 1),  // Used for built in calls
-    RPC_WRITABLE ((short ) 2),
-    RPC_PROTOCOL_BUFFER ((short)3), 
-    RPC_AVRO ((short)4);
-    
+    RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
+    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
+    RPC_AVRO ((short) 4);            // Use AvroRpcEngine 
+    static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
     private final short value;
-    private static final short FIRST_INDEX = RPC_BUILTIN.value;
+
     RpcKind(short val) {
       this.value = val;
     }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Sun Dec  4 20:44:36 2011
@@ -43,6 +43,7 @@ import java.nio.channels.WritableByteCha
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -66,6 +67,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -133,6 +135,59 @@ public abstract class Server {
    * Initial and max size of response buffer
    */
   static int INITIAL_RESP_BUF_SIZE = 10240;
+  
+  static class RpcKindMapValue {
+    final Class<? extends Writable> rpcRequestWrapperClass;
+    final RpcInvoker rpcInvoker;
+    RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+      this.rpcInvoker = rpcInvoker;
+      this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+    }   
+  }
+  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RpcKind, RpcKindMapValue>(4);
+  
+  
+
+  /**
+   * Register a RPC kind and the class to deserialize the rpc request.
+   * 
+   * Called by static initializers of rpcKind Engines
+   * @param rpcKind
+   * @param rpcRequestWrapperClass - this class is used to deserialze the
+   *  the rpc request.
+   *  @param rpcInvoker - use to process the calls on SS.
+   */
+  
+  public static void registerProtocolEngine(RpcKind rpcKind, 
+          Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+    RpcKindMapValue  old = 
+        rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+    if (old != null) {
+      rpcKindMap.put(rpcKind, old);
+      throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+          rpcKind);      
+    }
+    LOG.info("rpcKind=" + rpcKind + 
+        ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + 
+        ", rpcInvoker=" + rpcInvoker);
+  }
+  
+  public Class<? extends Writable> getRpcRequestWrapper(
+      RpcKind rpcKind) {
+    if (rpcRequestClass != null)
+       return rpcRequestClass;
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcRequestWrapperClass; 
+  }
+  
+  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcInvoker; 
+  }
+  
 
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log AUDITLOG = 
@@ -197,7 +252,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
-  private Class<? extends Writable> paramClass;   // class of call parameters
+  private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -1425,9 +1480,27 @@ public abstract class Server {
         throw new IOException("IPC Server does not implement operation" + 
               header.getOperation());
       }
+      // If we know the rpc kind, get its class so that we can deserialize
+      // (Note it would make more sense to have the handler deserialize but 
+      // we continue with this original design.
+      Class<? extends Writable> rpcRequestClass = 
+          getRpcRequestWrapper(header.getkind());
+      if (rpcRequestClass == null) {
+        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+            " from client " + getHostAddress());
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            IOException.class.getName(),
+            "Unknown rpc kind "  + header.getkind());
+        responder.doRespond(readParamsFailedCall);
+        return;   
+      }
       Writable rpcRequest;
       try { //Read the rpc request
-        rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
         rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
@@ -1519,7 +1592,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocolName, call.rpcRequest, 
+              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1528,7 +1601,7 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocolName, 
+                       return call(call.rpcKind, call.connection.protocolName, 
                                    call.rpcRequest, call.timestamp);
 
                      }
@@ -1590,24 +1663,33 @@ public abstract class Server {
                   Configuration conf)
     throws IOException 
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+        .toString(port), null);
   }
   
-  /** Constructs a server listening on the named port and address.  Parameters passed must
+  /** 
+   * Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * from configuration. Otherwise the configuration will be picked up.
+   * 
+   * If rpcRequestClass is null then the rpcRequestClass must have been 
+   * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+   *  Class, RPC.RpcInvoker)}
+   * This parameter has been retained for compatibility with existing tests
+   * and usage.
    */
   @SuppressWarnings("unchecked")
-  protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
-    this.paramClass = paramClass;
+    this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     if (queueSizePerHandler != -1) {
@@ -1797,17 +1879,17 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(String, Writable, long)} instead
+   * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,
+   *  Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
-    return call(null, param, receiveTime);
+    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
-  public abstract Writable call(String protocol,
-                               Writable param, long receiveTime)
-  throws IOException;
+  public abstract Writable call(RpcKind rpcKind, String protocol,
+      Writable param, long receiveTime) throws IOException;
   
   /**
    * Authorize the incoming client connection.
@@ -1957,5 +2039,5 @@ public abstract class Server {
 
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
-  }      
+  }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Sun Dec  4 20:44:36 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Array;
@@ -27,18 +26,14 @@ import java.lang.reflect.InvocationTarge
 
 import java.net.InetSocketAddress;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.io.Closeable;
-import java.util.Map;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,36 +48,9 @@ import org.apache.hadoop.conf.*;
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
- 
-  /**
-   * Get all superInterfaces that extend VersionedProtocol
-   * @param childInterfaces
-   * @return the super interfaces that extend VersionedProtocol
-   */
-  private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
-    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
-
-    for (Class<?> childInterface : childInterfaces) {
-      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
-          allInterfaces.add(childInterface);
-          allInterfaces.addAll(
-              Arrays.asList(
-                  getSuperInterfaces(childInterface.getInterfaces())));
-      } else {
-        LOG.warn("Interface " + childInterface +
-              " ignored because it does not extend VersionedProtocol");
-      }
-    }
-    return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
-  }
-  
-  /**
-   * Get all interfaces that the given protocol implements or extends
-   * which are assignable from VersionedProtocol.
-   */
-  private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
-    Class<?>[] interfaces  = protocol.getInterfaces();
-    return getSuperInterfaces(interfaces);
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+        Invocation.class, new Server.WritableRpcInvoker());
   }
 
   
@@ -120,15 +88,7 @@ public class WritableRpcEngine implement
         clientVersion = 0;
         clientMethodsHash = 0;
       } else {
-        try {
-          Field versionField = method.getDeclaringClass().getField("versionID");
-          versionField.setAccessible(true);
-          this.clientVersion = versionField.getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
+        this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
@@ -329,140 +289,25 @@ public class WritableRpcEngine implement
 
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private boolean verbose;
-    
-    /**
-     *  The key in Map
-     */
-    static class ProtoNameVer {
-      final String protocol;
-      final long   version;
-      ProtoNameVer(String protocol, long ver) {
-        this.protocol = protocol;
-        this.version = ver;
-      }
-      @Override
-      public boolean equals(Object o) {
-        if (o == null) 
-          return false;
-        if (this == o) 
-          return true;
-        if (! (o instanceof ProtoNameVer))
-          return false;
-        ProtoNameVer pv = (ProtoNameVer) o;
-        return ((pv.protocol.equals(this.protocol)) && 
-            (pv.version == this.version));     
-      }
-      @Override
-      public int hashCode() {
-        return protocol.hashCode() * 37 + (int) version;    
-      }
-    }
-    
-    /**
-     * The value in map
-     */
-    static class ProtoClassProtoImpl {
-      final Class<?> protocolClass;
-      final Object protocolImpl; 
-      ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
-        this.protocolClass = protocolClass;
-        this.protocolImpl = protocolImpl;
-      }
-    }
-    
-    private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap = 
-        new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
-    
-    // Register  protocol and its impl for rpc calls
-    private void registerProtocolAndImpl(Class<?> protocolClass, 
-        Object protocolImpl) throws IOException {
-      String protocolName = RPC.getProtocolName(protocolClass);
-      VersionedProtocol vp = (VersionedProtocol) protocolImpl;
-      long version;
-      try {
-        version = vp.getProtocolVersion(protocolName, 0);
-      } catch (Exception ex) {
-        LOG.warn("Protocol "  + protocolClass + 
-             " NOT registered as getProtocolVersion throws exception ");
-        return;
-      }
-      protocolImplMap.put(new ProtoNameVer(protocolName, version),
-          new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
-      LOG.debug("Protocol Name = " + protocolName +  " version=" + version +
-          " ProtocolImpl=" + protocolImpl.getClass().getName() + 
-          " protocolClass=" + protocolClass.getName());
-    }
-    
-    private static class VerProtocolImpl {
-      final long version;
-      final ProtoClassProtoImpl protocolTarget;
-      VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
-        this.version = ver;
-        this.protocolTarget = protocolTarget;
-      }
-    }
-    
-    
-    @SuppressWarnings("unused") // will be useful later.
-    private VerProtocolImpl[] getSupportedProtocolVersions(
-        String protocolName) {
-      VerProtocolImpl[] resultk = new  VerProtocolImpl[protocolImplMap.size()];
-      int i = 0;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
-                                        protocolImplMap.entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          resultk[i++] = 
-              new VerProtocolImpl(pv.getKey().version, pv.getValue());
-        }
-      }
-      if (i == 0) {
-        return null;
-      }
-      VerProtocolImpl[] result = new VerProtocolImpl[i];
-      System.arraycopy(resultk, 0, result, 0, i);
-      return result;
-    }
-    
-    private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {    
-      Long highestVersion = 0L;
-      ProtoClassProtoImpl highest = null;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
-          .entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          if ((highest == null) || (pv.getKey().version > highestVersion)) {
-            highest = pv.getValue();
-            highestVersion = pv.getKey().version;
-          } 
-        }
-      }
-      if (highest == null) {
-        return null;
-      }
-      return new VerProtocolImpl(highestVersion,  highest);   
-    }
- 
-
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * 
-     * @deprecated Use #Server(Class, Object, Configuration, String, int)
-     *    
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
      */
     @Deprecated
     public Server(Object instance, Configuration conf, String bindAddress,
-        int port) 
-      throws IOException {
+        int port) throws IOException {
       this(null, instance, conf,  bindAddress, port);
     }
     
     
     /** Construct an RPC server.
-     * @param protocol class
-     * @param instance the instance whose methods will be called
+     * @param protocolClass class
+     * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
@@ -474,16 +319,8 @@ public class WritableRpcEngine implement
           false, null);
     }
     
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
@@ -505,7 +342,8 @@ public class WritableRpcEngine implement
    
     }
     
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolClass - the protocol being registered
      *     can be null for compatibility with old usage (see below for details)
      * @param protocolImpl the protocol impl that will be called
@@ -520,7 +358,7 @@ public class WritableRpcEngine implement
         int numHandlers, int numReaders, int queueSizePerHandler, 
         boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+      super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
           classNameBase(protocolImpl.getClass().getName()), secretManager);
 
@@ -535,7 +373,7 @@ public class WritableRpcEngine implement
          * the protocolImpl is derived from the protocolClass(es) 
          * we register all interfaces extended by the protocolImpl
          */
-        protocols = getProtocolInterfaces(protocolImpl.getClass());
+        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
 
       } else {
         if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
@@ -544,132 +382,125 @@ public class WritableRpcEngine implement
               protocolImpl.getClass());
         }
         // register protocol class and its super interfaces
-        registerProtocolAndImpl(protocolClass, protocolImpl);
-        protocols = getProtocolInterfaces(protocolClass);
+        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        protocols = RPC.getProtocolInterfaces(protocolClass);
       }
       for (Class<?> p : protocols) {
         if (!p.equals(VersionedProtocol.class)) {
-          registerProtocolAndImpl(p, protocolImpl);
+          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
         }
       }
 
     }
 
- 
-    @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(
-        Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
-      registerProtocolAndImpl(protocolClass, protocolImpl);
-      return this;
+    private static void log(String value) {
+      if (value!= null && value.length() > 55)
+        value = value.substring(0, 55)+"...";
+      LOG.info(value);
     }
     
-    /**
-     * Process a client call
-     * @param protocolName - the protocol name (the class of the client proxy
-     *      used to make calls to the rpc server.
-     * @param param  parameters
-     * @param receivedTime time at which the call receoved (for metrics)
-     * @return the call's return
-     * @throws IOException
-     */
-    public Writable call(String protocolName, Writable param, long receivedTime) 
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
-        }
+    static class WritableRpcInvoker implements RpcInvoker {
+
+     @Override
+      public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+          String protocolName, Writable rpcRequest, long receivedTime)
+          throws IOException {
+        try {
+          Invocation call = (Invocation)rpcRequest;
+          if (server.verbose) log("Call: " + call);
+
+          // Verify rpc version
+          if (call.getRpcVersion() != writableRpcVersion) {
+            // Client is using a different version of WritableRpc
+            throw new IOException(
+                "WritableRpc version mismatch, client side version="
+                    + call.getRpcVersion() + ", server side version="
+                    + writableRpcVersion);
+          }
 
-        long clientVersion = call.getProtocolVersion();
-        final String protoName;
-        ProtoClassProtoImpl protocolImpl;
-        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
-          // VersionProtocol methods are often used by client to figure out
-          // which version of protocol to use.
-          //
-          // Versioned protocol methods should go the protocolName protocol
-          // rather than the declaring class of the method since the
-          // the declaring class is VersionedProtocol which is not 
-          // registered directly.
-          // Send the call to the highest  protocol version
-          protocolImpl = 
-              getHighestSupportedProtocol(protocolName).protocolTarget;
-        } else {
-          protoName = call.declaringClassProtocolName;
-
-          // Find the right impl for the protocol based on client version.
-          ProtoNameVer pv = 
-              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
-          protocolImpl = protocolImplMap.get(pv);
-          if (protocolImpl == null) { // no match for Protocol AND Version
-             VerProtocolImpl highest = 
-                 getHighestSupportedProtocol(protoName);
+          long clientVersion = call.getProtocolVersion();
+          final String protoName;
+          ProtoClassProtoImpl protocolImpl;
+          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+            // VersionProtocol methods are often used by client to figure out
+            // which version of protocol to use.
+            //
+            // Versioned protocol methods should go the protocolName protocol
+            // rather than the declaring class of the method since the
+            // the declaring class is VersionedProtocol which is not 
+            // registered directly.
+            // Send the call to the highest  protocol version
+            VerProtocolImpl highest = server.getHighestSupportedProtocol(
+                RpcKind.RPC_WRITABLE, protocolName);
             if (highest == null) {
-              throw new IOException("Unknown protocol: " + protoName);
-            } else { // protocol supported but not the version that client wants
-              throw new RPC.VersionMismatch(protoName, clientVersion,
-                highest.version);
+              throw new IOException("Unknown protocol: " + protocolName);
+            }
+            protocolImpl = highest.protocolTarget;
+          } else {
+            protoName = call.declaringClassProtocolName;
+
+            // Find the right impl for the protocol based on client version.
+            ProtoNameVer pv = 
+                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+            protocolImpl = 
+                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+            if (protocolImpl == null) { // no match for Protocol AND Version
+               VerProtocolImpl highest = 
+                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                       protoName);
+              if (highest == null) {
+                throw new IOException("Unknown protocol: " + protoName);
+              } else { // protocol supported but not the version that client wants
+                throw new RPC.VersionMismatch(protoName, clientVersion,
+                  highest.version);
+              }
             }
           }
-        }
-        
+          
 
-        // Invoke the protocol method
+          // Invoke the protocol method
 
-        long startTime = System.currentTimeMillis();
-        Method method = 
-            protocolImpl.protocolClass.getMethod(call.getMethodName(),
-            call.getParameterClasses());
-        method.setAccessible(true);
-        rpcDetailedMetrics.init(protocolImpl.protocolClass);
-        Object value = 
-            method.invoke(protocolImpl.protocolImpl, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.addRpcQueueTime(qTime);
-        rpcMetrics.addRpcProcessingTime(processingTime);
-        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
-                                             processingTime);
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
+          long startTime = System.currentTimeMillis();
+          Method method = 
+              protocolImpl.protocolClass.getMethod(call.getMethodName(),
+              call.getParameterClasses());
+          method.setAccessible(true);
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          Object value = 
+              method.invoke(protocolImpl.protocolImpl, call.getParameters());
+          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int qTime = (int) (startTime-receivedTime);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Served: " + call.getMethodName() +
+                      " queueTime= " + qTime +
+                      " procesingTime= " + processingTime);
+          }
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+                                               processingTime);
+          if (server.verbose) log("Return: "+value);
+
+          return new ObjectWritable(method.getReturnType(), value);
+
+        } catch (InvocationTargetException e) {
+          Throwable target = e.getTargetException();
+          if (target instanceof IOException) {
+            throw (IOException)target;
+          } else {
+            IOException ioe = new IOException(target.toString());
+            ioe.setStackTrace(target.getStackTrace());
+            throw ioe;
+          }
+        } catch (Throwable e) {
+          if (!(e instanceof IOException)) {
+            LOG.error("Unexpected throwable object ", e);
+          }
+          IOException ioe = new IOException(e.toString());
+          ioe.setStackTrace(e.getStackTrace());
           throw ioe;
         }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
       }
     }
   }
-
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java Sun Dec  4 20:44:36 2011
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: hadoop_rpc.proto
 
@@ -18,6 +35,14 @@ public final class HadoopRpcProtos {
     // optional bytes request = 2;
     boolean hasRequest();
     com.google.protobuf.ByteString getRequest();
+    
+    // required string declaringClassProtocolName = 3;
+    boolean hasDeclaringClassProtocolName();
+    String getDeclaringClassProtocolName();
+    
+    // required uint64 clientProtocolVersion = 4;
+    boolean hasClientProtocolVersion();
+    long getClientProtocolVersion();
   }
   public static final class HadoopRpcRequestProto extends
       com.google.protobuf.GeneratedMessage
@@ -90,9 +115,53 @@ public final class HadoopRpcProtos {
       return request_;
     }
     
+    // required string declaringClassProtocolName = 3;
+    public static final int DECLARINGCLASSPROTOCOLNAME_FIELD_NUMBER = 3;
+    private java.lang.Object declaringClassProtocolName_;
+    public boolean hasDeclaringClassProtocolName() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getDeclaringClassProtocolName() {
+      java.lang.Object ref = declaringClassProtocolName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          declaringClassProtocolName_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getDeclaringClassProtocolNameBytes() {
+      java.lang.Object ref = declaringClassProtocolName_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        declaringClassProtocolName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required uint64 clientProtocolVersion = 4;
+    public static final int CLIENTPROTOCOLVERSION_FIELD_NUMBER = 4;
+    private long clientProtocolVersion_;
+    public boolean hasClientProtocolVersion() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public long getClientProtocolVersion() {
+      return clientProtocolVersion_;
+    }
+    
     private void initFields() {
       methodName_ = "";
       request_ = com.google.protobuf.ByteString.EMPTY;
+      declaringClassProtocolName_ = "";
+      clientProtocolVersion_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -103,6 +172,14 @@ public final class HadoopRpcProtos {
         memoizedIsInitialized = 0;
         return false;
       }
+      if (!hasDeclaringClassProtocolName()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasClientProtocolVersion()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -116,6 +193,12 @@ public final class HadoopRpcProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBytes(2, request_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getDeclaringClassProtocolNameBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt64(4, clientProtocolVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -133,6 +216,14 @@ public final class HadoopRpcProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(2, request_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getDeclaringClassProtocolNameBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(4, clientProtocolVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -166,6 +257,16 @@ public final class HadoopRpcProtos {
         result = result && getRequest()
             .equals(other.getRequest());
       }
+      result = result && (hasDeclaringClassProtocolName() == other.hasDeclaringClassProtocolName());
+      if (hasDeclaringClassProtocolName()) {
+        result = result && getDeclaringClassProtocolName()
+            .equals(other.getDeclaringClassProtocolName());
+      }
+      result = result && (hasClientProtocolVersion() == other.hasClientProtocolVersion());
+      if (hasClientProtocolVersion()) {
+        result = result && (getClientProtocolVersion()
+            == other.getClientProtocolVersion());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -183,6 +284,14 @@ public final class HadoopRpcProtos {
         hash = (37 * hash) + REQUEST_FIELD_NUMBER;
         hash = (53 * hash) + getRequest().hashCode();
       }
+      if (hasDeclaringClassProtocolName()) {
+        hash = (37 * hash) + DECLARINGCLASSPROTOCOLNAME_FIELD_NUMBER;
+        hash = (53 * hash) + getDeclaringClassProtocolName().hashCode();
+      }
+      if (hasClientProtocolVersion()) {
+        hash = (37 * hash) + CLIENTPROTOCOLVERSION_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getClientProtocolVersion());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -303,6 +412,10 @@ public final class HadoopRpcProtos {
         bitField0_ = (bitField0_ & ~0x00000001);
         request_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000002);
+        declaringClassProtocolName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        clientProtocolVersion_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -349,6 +462,14 @@ public final class HadoopRpcProtos {
           to_bitField0_ |= 0x00000002;
         }
         result.request_ = request_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.declaringClassProtocolName_ = declaringClassProtocolName_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.clientProtocolVersion_ = clientProtocolVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -371,6 +492,12 @@ public final class HadoopRpcProtos {
         if (other.hasRequest()) {
           setRequest(other.getRequest());
         }
+        if (other.hasDeclaringClassProtocolName()) {
+          setDeclaringClassProtocolName(other.getDeclaringClassProtocolName());
+        }
+        if (other.hasClientProtocolVersion()) {
+          setClientProtocolVersion(other.getClientProtocolVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -380,6 +507,14 @@ public final class HadoopRpcProtos {
           
           return false;
         }
+        if (!hasDeclaringClassProtocolName()) {
+          
+          return false;
+        }
+        if (!hasClientProtocolVersion()) {
+          
+          return false;
+        }
         return true;
       }
       
@@ -416,6 +551,16 @@ public final class HadoopRpcProtos {
               request_ = input.readBytes();
               break;
             }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              declaringClassProtocolName_ = input.readBytes();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              clientProtocolVersion_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -482,6 +627,63 @@ public final class HadoopRpcProtos {
         return this;
       }
       
+      // required string declaringClassProtocolName = 3;
+      private java.lang.Object declaringClassProtocolName_ = "";
+      public boolean hasDeclaringClassProtocolName() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getDeclaringClassProtocolName() {
+        java.lang.Object ref = declaringClassProtocolName_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          declaringClassProtocolName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setDeclaringClassProtocolName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        declaringClassProtocolName_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearDeclaringClassProtocolName() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        declaringClassProtocolName_ = getDefaultInstance().getDeclaringClassProtocolName();
+        onChanged();
+        return this;
+      }
+      void setDeclaringClassProtocolName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000004;
+        declaringClassProtocolName_ = value;
+        onChanged();
+      }
+      
+      // required uint64 clientProtocolVersion = 4;
+      private long clientProtocolVersion_ ;
+      public boolean hasClientProtocolVersion() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public long getClientProtocolVersion() {
+        return clientProtocolVersion_;
+      }
+      public Builder setClientProtocolVersion(long value) {
+        bitField0_ |= 0x00000008;
+        clientProtocolVersion_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearClientProtocolVersion() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        clientProtocolVersion_ = 0L;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:HadoopRpcRequestProto)
     }
     
@@ -1706,16 +1908,18 @@ public final class HadoopRpcProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\020hadoop_rpc.proto\"<\n\025HadoopRpcRequestPr" +
+      "\n\020hadoop_rpc.proto\"\177\n\025HadoopRpcRequestPr" +
       "oto\022\022\n\nmethodName\030\001 \002(\t\022\017\n\007request\030\002 \001(\014" +
-      "\"D\n\027HadoopRpcExceptionProto\022\025\n\rexception" +
-      "Name\030\001 \001(\t\022\022\n\nstackTrace\030\002 \001(\t\"\272\001\n\026Hadoo" +
-      "pRpcResponseProto\0226\n\006status\030\001 \002(\0162&.Hado" +
-      "opRpcResponseProto.ResponseStatus\022\020\n\010res" +
-      "ponse\030\002 \001(\014\022+\n\texception\030\003 \001(\0132\030.HadoopR" +
-      "pcExceptionProto\")\n\016ResponseStatus\022\013\n\007SU" +
-      "CCESS\020\001\022\n\n\006ERRROR\020\002B4\n\036org.apache.hadoop" +
-      ".ipc.protobufB\017HadoopRpcProtos\240\001\001"
+      "\022\"\n\032declaringClassProtocolName\030\003 \002(\t\022\035\n\025" +
+      "clientProtocolVersion\030\004 \002(\004\"D\n\027HadoopRpc" +
+      "ExceptionProto\022\025\n\rexceptionName\030\001 \001(\t\022\022\n" +
+      "\nstackTrace\030\002 \001(\t\"\272\001\n\026HadoopRpcResponseP" +
+      "roto\0226\n\006status\030\001 \002(\0162&.HadoopRpcResponse" +
+      "Proto.ResponseStatus\022\020\n\010response\030\002 \001(\014\022+" +
+      "\n\texception\030\003 \001(\0132\030.HadoopRpcExceptionPr" +
+      "oto\")\n\016ResponseStatus\022\013\n\007SUCCESS\020\001\022\n\n\006ER",
+      "RROR\020\002B4\n\036org.apache.hadoop.ipc.protobuf" +
+      "B\017HadoopRpcProtos\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1727,7 +1931,7 @@ public final class HadoopRpcProtos {
           internal_static_HadoopRpcRequestProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_HadoopRpcRequestProto_descriptor,
-              new java.lang.String[] { "MethodName", "Request", },
+              new java.lang.String[] { "MethodName", "Request", "DeclaringClassProtocolName", "ClientProtocolVersion", },
               org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.class,
               org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto.Builder.class);
           internal_static_HadoopRpcExceptionProto_descriptor =

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto Sun Dec  4 20:44:36 2011
@@ -34,6 +34,12 @@ message HadoopRpcRequestProto {
 
   /** Bytes corresponding to the client protobuf request */
   optional bytes request = 2;
+  
+  /** protocol name of class declaring the called method */ 
+  required string declaringClassProtocolName = 3;
+  
+  /** protocol version of class declaring the called method */
+  required uint64 clientProtocolVersion = 4;
 }
 
 /**

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java Sun Dec  4 20:44:36 2011
@@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.TestSaslRPC.CustomSecurityInfo;
 import org.apache.hadoop.ipc.TestSaslRPC.TestTokenIdentifier;
 import org.apache.hadoop.ipc.TestSaslRPC.TestTokenSecretManager;
@@ -101,7 +102,8 @@ public class TestAvroRpc extends TestCas
     RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
     RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
                                       ADDRESS, 0, 5, true, conf, sm);
-    server.addProtocol(AvroTestProtocol.class, new TestImpl());
+    server.addProtocol(RpcKind.RPC_WRITABLE, 
+        AvroTestProtocol.class, new TestImpl());
 
     try {
       server.start();

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1210208&r1=1210207&r2=1210208&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sun Dec  4 20:44:36 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 
@@ -96,8 +97,8 @@ public class TestIPC {
     }
 
     @Override
-    public Writable call(String protocol, Writable param, long receiveTime)
-        throws IOException {
+    public Writable call(RpcKind rpcKind, String protocol, Writable param,
+        long receiveTime) throws IOException {
       if (sleep) {
         // sleep a bit
         try {