You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/01/12 03:54:02 UTC

svn commit: r1230378 [1/2] - in /hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common: ./ src/main/java/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/ipc/protobuf/ src/proto/ src/test/java/org/apache/hadoop/...

Author: szetszwo
Date: Thu Jan 12 02:54:01 2012
New Revision: 1230378

URL: http://svn.apache.org/viewvc?rev=1230378&view=rev
Log:
svn merge -c 1210208 from trunk for HADOOP-7862.

Modified:
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protobuf/HadoopRpcProtos.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/proto/hadoop_rpc.proto
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java
    hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 12 02:54:01 2012
@@ -1 +1 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166009,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1209246,1210208,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1213954,1214046,1220510,1221348,1226211,1227091,1227423

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jan 12 02:54:01 2012
@@ -23,6 +23,9 @@ Release 0.23-PB - Unreleased
 
     HADOOP-7776 Make the Ipc-Header in a RPC-Payload an explicit header (sanjay)
 
+    HADOOP-7862  Move the support for multiple protocols to lower layer so
+    that Writable, PB and Avro can all use it (Sanjay)
+
   BUG FIXES
 
     HADOOP-7833. Fix findbugs warnings in protobuf generated code.

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 12 02:54:01 2012
@@ -1,5 +1,5 @@
 /hadoop/common/branches/yahoo-merge/CHANGES.txt:1079157,1079163-1079164,1079167
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164771,1166009,1166402,1167318,1167383,1169986,1170046,1170379,1170459,1171297,1171894,1171909,1172186,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1179869,1182641,1183132,1189357,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204376,1204388,1205260,1206830,1207694,1208153,1208313,1209246,1210208,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213598,1214046,1220510,1221348,1226211,1226351,1227091,1227423
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
 /hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278

Propchange: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 12 02:54:01 2012
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
+/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:1161777,1161781,1162008,1162188,1162421,1162491,1162499,1162613,1162928,1162954,1162979,1163050,1163069,1163456,1163465,1163490,1163768,1163852,1163858,1164255,1164301,1164339,1164771,1166402,1167318,1167383,1170379,1170459,1171297,1172916,1173402,1176550,1176986,1177002,1177035,1177399,1177487,1177531,1177859,1177864,1178639,1182641,1183132,1189932,1189982,1190109,1190611,1195575,1195760,1196113,1196129,1197885,1204114,1204117,1204122,1204124,1204129,1204131,1204363,1204370,1204376,1204388,1205260,1206786,1206830,1207694,1208153,1208313,1210208,1212021,1212062,1212073,1212084,1213537,1213586,1213592-1213593,1213954,1214046,1220510,1221348,1226211,1227091,1227423
 /hadoop/core/branches/branch-0.19/core/src/java:713112
 /hadoop/core/trunk/src/core:776175-785643,785929-786278

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu Jan 12 02:54:01 2012
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -237,14 +238,15 @@ public class AvroRpcEngine implements Rp
       super((Class)null, new Object(), conf,
             bindAddress, port, numHandlers, numReaders,
             queueSizePerHandler, verbose, secretManager);
-      super.addProtocol(TunnelProtocol.class, responder);
+      // RpcKind is WRITABLE since Avro is tunneled through WRITABLE
+      super.addProtocol(RpcKind.RPC_WRITABLE, TunnelProtocol.class, responder);
       responder.addProtocol(iface, impl);
     }
 
 
     @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+    public Server
+      addProtocol(RpcKind rpcKind, Class<?> protocolClass, Object protocolImpl)
         throws IOException {
       responder.addProtocol(protocolClass, protocolImpl);
       return this;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Jan 12 02:54:01 2012
@@ -1003,17 +1003,19 @@ public class Client {
   }
 
   /**
-   * Same as {@link #call(RpcKind, Writable, ConnectionId)} for Writable
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   *  for RPC_BUILTIN
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
-    return call(RpcKind.RPC_WRITABLE, param, address);
+    return call(RpcKind.RPC_BUILTIN, param, address);
     
   }
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
@@ -1026,7 +1028,8 @@ public class Client {
    * the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception.
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, 
+   * ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1043,7 +1046,8 @@ public class Client {
    * timeout, returning the value.  
    * Throws exceptions if there are network problems or if the remote code 
    * threw an exception. 
-   * @deprecated Use {@link #call(RpcKind, Writable, ConnectionId)} instead 
+   * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable,
+   *  ConnectionId)} instead 
    */
   @Deprecated
   public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, 
@@ -1057,7 +1061,7 @@ public class Client {
 
   
   /**
-   * Same as {@link #call(RpcKind, Writable, InetSocketAddress, 
+   * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, 
    * Class, UserGroupInformation, int, Configuration)}
    * except that rpcKind is writable.
    */
@@ -1067,7 +1071,7 @@ public class Client {
       throws InterruptedException, IOException {
         ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
         ticket, rpcTimeout, conf);
-        return call(RpcKind.RPC_WRITABLE, param, remoteId);
+    return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
   /**
@@ -1088,21 +1092,28 @@ public class Client {
   }
   
   /**
-   * Same as {link {@link #call(RpcKind, Writable, ConnectionId)}
-   * except the rpcKind is RPC_WRITABLE
+   * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)}
+   * except the rpcKind is RPC_BUILTIN
    */
   public Writable call(Writable param, ConnectionId remoteId)  
       throws InterruptedException, IOException {
-     return call(RpcKind.RPC_WRITABLE, param, remoteId);
+     return call(RpcKind.RPC_BUILTIN, param, remoteId);
   }
   
-  /** Make a call, passing <code>param</code>, to the IPC server defined by
-   * <code>remoteId</code>, returning the value.  
+  /** 
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc respond.
+   * 
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @returns the rpc response
    * Throws exceptions if there are network problems or if the remote code 
-   * threw an exception. */
-  public Writable call(RpcKind rpcKind, Writable param, ConnectionId remoteId)  
-      throws InterruptedException, IOException {
-    Call call = new Call(rpcKind, param);
+   * threw an exception.
+   */
+  public Writable call(RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId) throws InterruptedException, IOException {
+    Call call = new Call(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java Thu Jan 12 02:54:01 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
 import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
@@ -60,6 +61,12 @@ import com.google.protobuf.ServiceExcept
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
+  
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(
+        RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
+        new Server.ProtoBufRpcInvoker());
+  }
 
   private static final ClientCache CLIENTS = new ClientCache();
 
@@ -75,10 +82,13 @@ public class ProtobufRpcEngine implement
   }
 
   private static class Invoker implements InvocationHandler, Closeable {
-    private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+    private final Map<String, Message> returnTypes = 
+        new ConcurrentHashMap<String, Message>();
     private boolean isClosed = false;
-    private Client.ConnectionId remoteId;
-    private Client client;
+    private final Client.ConnectionId remoteId;
+    private final Client client;
+    private final long clientProtocolVersion;
+    private final String protocolName;
 
     public Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
@@ -87,6 +97,8 @@ public class ProtobufRpcEngine implement
           ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory,
           RpcResponseWritable.class);
+      this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
+      this.protocolName = RPC.getProtocolName(protocol);
     }
 
     private HadoopRpcRequestProto constructRpcRequest(Method method,
@@ -108,6 +120,19 @@ public class ProtobufRpcEngine implement
 
       Message param = (Message) params[1];
       builder.setRequest(param.toByteString());
+      // For protobuf, {@code protocol} used when creating client side proxy is
+      // the interface extending BlockingInterface, which has the annotations 
+      // such as ProtocolName etc.
+      //
+      // Using Method.getDeclaringClass(), as in WritableEngine to get at
+      // the protocol interface will return BlockingInterface, from where 
+      // the annotation ProtocolName and Version cannot be
+      // obtained.
+      //
+      // Hence we simply use the protocol class used to create the proxy.
+      // For PB this may limit the use of mixins on client side.
+      builder.setDeclaringClassProtocolName(protocolName);
+      builder.setClientProtocolVersion(clientProtocolVersion);
       rpcRequest = builder.build();
       return rpcRequest;
     }
@@ -272,15 +297,16 @@ public class ProtobufRpcEngine implement
         RpcResponseWritable.class);
   }
   
+ 
 
   @Override
-  public RPC.Server getServer(Class<?> protocol, Object instance,
+  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
       String bindAddress, int port, int numHandlers, int numReaders,
       int queueSizePerHandler, boolean verbose, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager)
       throws IOException {
-    return new Server(instance, conf, bindAddress, port, numHandlers,
-        numReaders, queueSizePerHandler, verbose, secretManager);
+    return new Server(protocol, protocolImpl, conf, bindAddress, port,
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
   }
   
   private static RemoteException getRemoteException(Exception e) {
@@ -289,87 +315,31 @@ public class ProtobufRpcEngine implement
   }
 
   public static class Server extends RPC.Server {
-    private BlockingService service;
-    private boolean verbose;
-
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length - 1];
-    }
-
     /**
      * Construct an RPC server.
      * 
-     * @param instance the instance whose methods will be called
+     * @param protocolClass the class of protocol
+     * @param protocolImpl the protocolImpl whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      */
-    public Server(Object instance, Configuration conf, String bindAddress,
-        int port, int numHandlers, int numReaders, int queueSizePerHandler,
-        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager)
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress, int port, int numHandlers,
+        int numReaders, int queueSizePerHandler, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager)
         throws IOException {
       super(bindAddress, port, RpcRequestWritable.class, numHandlers,
-          numReaders, queueSizePerHandler, conf, classNameBase(instance
+          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
               .getClass().getName()), secretManager);
-      this.service = (BlockingService) instance;
-      this.verbose = verbose;
+      this.verbose = verbose;  
+      registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, 
+          protocolClass, protocolImpl);
     }
 
-    /**
-     * This is a server side method, which is invoked over RPC. On success
-     * the return response has protobuf response payload. On failure, the
-     * exception name and the stack trace are return in the resposne. See {@link HadoopRpcResponseProto}
-     * 
-     * In this method there three types of exceptions possible and they are
-     * returned in response as follows.
-     * <ol>
-     * <li> Exceptions encountered in this method that are returned as {@link RpcServerException} </li>
-     * <li> Exceptions thrown by the service is wrapped in ServiceException. In that
-     * this method returns in response the exception thrown by the service.</li>
-     * <li> Other exceptions thrown by the service. They are returned as
-     * it is.</li>
-     * </ol>
-     */
-    @Override
-    public Writable call(String protocol, Writable writableRequest,
-        long receiveTime) throws IOException {
-      RpcRequestWritable request = (RpcRequestWritable) writableRequest;
-      HadoopRpcRequestProto rpcRequest = request.message;
-      String methodName = rpcRequest.getMethodName();
-      if (verbose)
-        LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
-      MethodDescriptor methodDescriptor = service.getDescriptorForType()
-          .findMethodByName(methodName);
-      if (methodDescriptor == null) {
-        String msg = "Unknown method " + methodName + " called on " + protocol
-            + " protocol.";
-        LOG.warn(msg);
-        return handleException(new RpcServerException(msg));
-      }
-      Message prototype = service.getRequestPrototype(methodDescriptor);
-      Message param = prototype.newBuilderForType()
-          .mergeFrom(rpcRequest.getRequest()).build();
-      Message result;
-      try {
-        result = service.callBlockingMethod(methodDescriptor, null, param);
-      } catch (ServiceException e) {
-        Throwable cause = e.getCause();
-        return handleException(cause != null ? cause : e);
-      } catch (Exception e) {
-        return handleException(e);
-      }
-
-      HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
-      return new RpcResponseWritable(response);
-    }
-
-    private RpcResponseWritable handleException(Throwable e) {
+    private static RpcResponseWritable handleException(Throwable e) {
       HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder()
           .setExceptionName(e.getClass().getName())
           .setStackTrace(StringUtils.stringifyException(e)).build();
@@ -378,7 +348,7 @@ public class ProtobufRpcEngine implement
       return new RpcResponseWritable(response);
     }
 
-    private HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
+    private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse(
         Message message) {
       HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder()
           .setResponse(message.toByteString())
@@ -386,5 +356,81 @@ public class ProtobufRpcEngine implement
           .build();
       return res;
     }
+    
+    /**
+     * Protobuf invoker for {@link RpcInvoker}
+     */
+    static class ProtoBufRpcInvoker implements RpcInvoker {
+
+      @Override 
+      /**
+       * This is a server side method, which is invoked over RPC. On success
+       * the return response has protobuf response payload. On failure, the
+       * exception name and the stack trace are return in the resposne.
+       * See {@link HadoopRpcResponseProto}
+       * 
+       * In this method there three types of exceptions possible and they are
+       * returned in response as follows.
+       * <ol>
+       * <li> Exceptions encountered in this method that are returned 
+       * as {@link RpcServerException} </li>
+       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
+       * In that this method returns in response the exception thrown by the 
+       * service.</li>
+       * <li> Other exceptions thrown by the service. They are returned as
+       * it is.</li>
+       * </ol>
+       */
+      public Writable call(RPC.Server server, String protocol,
+          Writable writableRequest, long receiveTime) throws IOException {
+        RpcRequestWritable request = (RpcRequestWritable) writableRequest;
+        HadoopRpcRequestProto rpcRequest = request.message;
+        String methodName = rpcRequest.getMethodName();
+        String protoName = rpcRequest.getDeclaringClassProtocolName();
+        long clientVersion = rpcRequest.getClientProtocolVersion();
+        if (server.verbose)
+          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
+        
+        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
+        ProtoClassProtoImpl protocolImpl = 
+            server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
+        if (protocolImpl == null) { // no match for Protocol AND Version
+          VerProtocolImpl highest = 
+              server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, 
+                  protoName);
+          if (highest == null) {
+            throw new IOException("Unknown protocol: " + protoName);
+          }
+          // protocol supported but not the version that client wants
+          throw new RPC.VersionMismatch(protoName, clientVersion,
+              highest.version);
+        }
+        
+        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
+        MethodDescriptor methodDescriptor = service.getDescriptorForType()
+            .findMethodByName(methodName);
+        if (methodDescriptor == null) {
+          String msg = "Unknown method " + methodName + " called on " + protocol
+              + " protocol.";
+          LOG.warn(msg);
+          return handleException(new RpcServerException(msg));
+        }
+        Message prototype = service.getRequestPrototype(methodDescriptor);
+        Message param = prototype.newBuilderForType()
+            .mergeFrom(rpcRequest.getRequest()).build();
+        Message result;
+        try {
+          result = service.callBlockingMethod(methodDescriptor, null, param);
+        } catch (ServiceException e) {
+          Throwable cause = e.getCause();
+          return handleException(cause != null ? cause : e);
+        } catch (Exception e) {
+          return handleException(e);
+        }
+  
+        HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result);
+        return new RpcResponseWritable(response);
+      }
+    }
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolInfo.java Thu Jan 12 02:54:01 2012
@@ -35,4 +35,5 @@ import java.lang.annotation.RetentionPol
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ProtocolInfo {
   String protocolName();  // the name of the protocol (i.e. rpc service)
+  long protocolVersion() default -1; // default means not defined use old way
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolProxy.java Thu Jan 12 02:54:01 2012
@@ -57,19 +57,11 @@ public class ProtocolProxy<T> {
   
   private void fetchServerMethods(Method method) throws IOException {
     long clientVersion;
-    try {
-      Field versionField = method.getDeclaringClass().getField("versionID");
-      versionField.setAccessible(true);
-      clientVersion = versionField.getLong(method.getDeclaringClass());
-    } catch (NoSuchFieldException ex) {
-      throw new RuntimeException(ex);
-    } catch (IllegalAccessException ex) {
-      throw new RuntimeException(ex);
-    }
+    clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
     int clientMethodsHash = ProtocolSignature.getFingerprint(method
         .getDeclaringClass().getMethods());
     ProtocolSignature serverInfo = ((VersionedProtocol) proxy)
-        .getProtocolSignature(protocol.getName(), clientVersion,
+        .getProtocolSignature(RPC.getProtocolName(protocol), clientVersion,
             clientMethodsHash);
     long serverVersion = serverInfo.getVersion();
     if (serverVersion != clientVersion) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolSignature.java Thu Jan 12 02:54:01 2012
@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ProtocolSignature implements Writable {
   static {               // register a ctor
     WritableFactories.setFactory
@@ -164,10 +166,15 @@ public class ProtocolSignature implement
   /**
    * A cache that maps a protocol's name to its signature & finger print
    */
-  final private static HashMap<String, ProtocolSigFingerprint> 
+  private final static HashMap<String, ProtocolSigFingerprint> 
      PROTOCOL_FINGERPRINT_CACHE = 
        new HashMap<String, ProtocolSigFingerprint>();
   
+  @VisibleForTesting
+  public static void resetCache() {
+    PROTOCOL_FINGERPRINT_CACHE.clear();
+  }
+  
   /**
    * Return a protocol's signature and finger print from cache
    * 
@@ -177,7 +184,7 @@ public class ProtocolSignature implement
    */
   private static ProtocolSigFingerprint getSigFingerprint(
       Class <? extends VersionedProtocol> protocol, long serverVersion) {
-    String protocolName = protocol.getName();
+    String protocolName = RPC.getProtocolName(protocol);
     synchronized (PROTOCOL_FINGERPRINT_CACHE) {
       ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
       if (sig == null) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Thu Jan 12 02:54:01 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
@@ -28,6 +29,9 @@ import java.net.NoRouteToHostException;
 import java.net.SocketTimeoutException;
 import java.io.*;
 import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
@@ -36,6 +40,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,8 +68,54 @@ import org.apache.hadoop.util.Reflection
  * the protocol instance is transmitted.
  */
 public class RPC {
+  
+  interface RpcInvoker {   
+    /**
+     * Process a client call on the server side
+     * @param server the server within whose context this rpc call is made
+     * @param protocol - the protocol name (the class of the client proxy
+     *      used to make calls to the rpc server.
+     * @param rpcRequest  - deserialized
+     * @param receiveTime time at which the call received (for metrics)
+     * @return the call's return
+     * @throws IOException
+     **/
+    public Writable call(Server server, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException ;
+  }
+  
   static final Log LOG = LogFactory.getLog(RPC.class);
   
+  /**
+   * Get all superInterfaces that extend VersionedProtocol
+   * @param childInterfaces
+   * @return the super interfaces that extend VersionedProtocol
+   */
+  static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
+    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
+
+    for (Class<?> childInterface : childInterfaces) {
+      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
+          allInterfaces.add(childInterface);
+          allInterfaces.addAll(
+              Arrays.asList(
+                  getSuperInterfaces(childInterface.getInterfaces())));
+      } else {
+        LOG.warn("Interface " + childInterface +
+              " ignored because it does not extend VersionedProtocol");
+      }
+    }
+    return allInterfaces.toArray(new Class[allInterfaces.size()]);
+  }
+  
+  /**
+   * Get all interfaces that the given protocol implements or extends
+   * which are assignable from VersionedProtocol.
+   */
+  static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
+    Class<?>[] interfaces  = protocol.getInterfaces();
+    return getSuperInterfaces(interfaces);
+  }
   
   /**
    * Get the protocol name.
@@ -75,9 +126,36 @@ public class RPC {
     if (protocol == null) {
       return null;
     }
-    ProtocolInfo anno = (ProtocolInfo) protocol.getAnnotation(ProtocolInfo.class);
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
     return  (anno == null) ? protocol.getName() : anno.protocolName();
   }
+  
+  /**
+   * Get the protocol version from protocol class.
+   * If the protocol class has a ProtocolAnnotation, then get the protocol
+   * name from the annotation; otherwise the class name is the protocol name.
+   */
+  static public long getProtocolVersion(Class<?> protocol) {
+    if (protocol == null) {
+      throw new IllegalArgumentException("Null protocol");
+    }
+    long version;
+    ProtocolInfo anno = protocol.getAnnotation(ProtocolInfo.class);
+    if (anno != null) {
+      version = anno.protocolVersion();
+      if (version != -1)
+        return version;
+    }
+    try {
+      Field versionField = protocol.getField("versionID");
+      versionField.setAccessible(true);
+      return versionField.getLong(protocol);
+    } catch (NoSuchFieldException ex) {
+      throw new RuntimeException(ex);
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
 
   private RPC() {}                                  // no public ctor
 
@@ -589,6 +667,144 @@ public class RPC {
 
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
+   boolean verbose;
+   static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+   
+   /**
+    * Store a map of protocol and version to its implementation
+    */
+   /**
+    *  The key in Map
+    */
+   static class ProtoNameVer {
+     final String protocol;
+     final long   version;
+     ProtoNameVer(String protocol, long ver) {
+       this.protocol = protocol;
+       this.version = ver;
+     }
+     @Override
+     public boolean equals(Object o) {
+       if (o == null) 
+         return false;
+       if (this == o) 
+         return true;
+       if (! (o instanceof ProtoNameVer))
+         return false;
+       ProtoNameVer pv = (ProtoNameVer) o;
+       return ((pv.protocol.equals(this.protocol)) && 
+           (pv.version == this.version));     
+     }
+     @Override
+     public int hashCode() {
+       return protocol.hashCode() * 37 + (int) version;    
+     }
+   }
+   
+   /**
+    * The value in map
+    */
+   static class ProtoClassProtoImpl {
+     final Class<?> protocolClass;
+     final Object protocolImpl; 
+     ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
+       this.protocolClass = protocolClass;
+       this.protocolImpl = protocolImpl;
+     }
+   }
+
+   ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = 
+       new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
+   
+   Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) {
+     if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
+       for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
+         protocolImplMapArray.add(
+             new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10));
+       }
+     }
+     return protocolImplMapArray.get(rpcKind.ordinal());   
+   }
+   
+   // Register  protocol and its impl for rpc calls
+   void registerProtocolAndImpl(RpcKind rpcKind, Class<?> protocolClass, 
+       Object protocolImpl) throws IOException {
+     String protocolName = RPC.getProtocolName(protocolClass);
+     long version;
+     
+
+     try {
+       version = RPC.getProtocolVersion(protocolClass);
+     } catch (Exception ex) {
+       LOG.warn("Protocol "  + protocolClass + 
+            " NOT registered as cannot get protocol version ");
+       return;
+     }
+
+
+     getProtocolImplMap(rpcKind).put(new ProtoNameVer(protocolName, version),
+         new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
+     LOG.debug("RpcKind = " + rpcKind + " Protocol Name = " + protocolName +  " version=" + version +
+         " ProtocolImpl=" + protocolImpl.getClass().getName() + 
+         " protocolClass=" + protocolClass.getName());
+   }
+   
+   static class VerProtocolImpl {
+     final long version;
+     final ProtoClassProtoImpl protocolTarget;
+     VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
+       this.version = ver;
+       this.protocolTarget = protocolTarget;
+     }
+   }
+   
+   
+   @SuppressWarnings("unused") // will be useful later.
+   VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind,
+       String protocolName) {
+     VerProtocolImpl[] resultk = 
+         new  VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
+     int i = 0;
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
+                                       getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         resultk[i++] = 
+             new VerProtocolImpl(pv.getKey().version, pv.getValue());
+       }
+     }
+     if (i == 0) {
+       return null;
+     }
+     VerProtocolImpl[] result = new VerProtocolImpl[i];
+     System.arraycopy(resultk, 0, result, 0, i);
+     return result;
+   }
+   
+   VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, 
+       String protocolName) {    
+     Long highestVersion = 0L;
+     ProtoClassProtoImpl highest = null;
+ System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+     for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : 
+           getProtocolImplMap(rpcKind).entrySet()) {
+       if (pv.getKey().protocol.equals(protocolName)) {
+         if ((highest == null) || (pv.getKey().version > highestVersion)) {
+           highest = pv.getValue();
+           highestVersion = pv.getKey().version;
+         } 
+       }
+     }
+     if (highest == null) {
+       return null;
+     }
+     return new VerProtocolImpl(highestVersion,  highest);   
+   }
   
     protected Server(String bindAddress, int port, 
                      Class<? extends Writable> paramClass, int handlerCount,
@@ -605,11 +821,17 @@ public class RPC {
      * @param protocolImpl - the impl of the protocol that will be called
      * @return the server (for convenience)
      */
-    public <PROTO, IMPL extends PROTO>
-      Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
-    ) throws IOException {
-      throw new IOException("addProtocol Not Implemented");
+    public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
+        Object protocolImpl) throws IOException {
+      registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl);
+      return this;
+    }
+    
+    @Override
+    public Writable call(RpcKind rpcKind, String protocol,
+        Writable rpcRequest, long receiveTime) throws IOException {
+      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
+          receiveTime);
     }
   }
-
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java Thu Jan 12 02:54:01 2012
@@ -54,13 +54,14 @@ public class RpcPayloadHeader implements
   }
   
   public enum RpcKind {
-    RPC_BUILTIN ((short ) 1),  // Used for built in calls
-    RPC_WRITABLE ((short ) 2),
-    RPC_PROTOCOL_BUFFER ((short)3), 
-    RPC_AVRO ((short)4);
-    
+    RPC_BUILTIN ((short) 1),         // Used for built in calls by tests
+    RPC_WRITABLE ((short) 2),        // Use WritableRpcEngine 
+    RPC_PROTOCOL_BUFFER ((short) 3), // Use ProtobufRpcEngine
+    RPC_AVRO ((short) 4);            // Use AvroRpcEngine 
+    static final short MAX_INDEX = RPC_AVRO.value; // used for array size
+    private static final short FIRST_INDEX = RPC_BUILTIN.value;    
     private final short value;
-    private static final short FIRST_INDEX = RPC_BUILTIN.value;
+
     RpcKind(short val) {
       this.value = val;
     }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Jan 12 02:54:01 2012
@@ -43,6 +43,7 @@ import java.nio.channels.WritableByteCha
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -67,6 +68,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@@ -117,6 +119,59 @@ public abstract class Server {
    * Initial and max size of response buffer
    */
   static int INITIAL_RESP_BUF_SIZE = 10240;
+  
+  static class RpcKindMapValue {
+    final Class<? extends Writable> rpcRequestWrapperClass;
+    final RpcInvoker rpcInvoker;
+    RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+      this.rpcInvoker = rpcInvoker;
+      this.rpcRequestWrapperClass = rpcRequestWrapperClass;
+    }   
+  }
+  static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
+      HashMap<RpcKind, RpcKindMapValue>(4);
+  
+  
+
+  /**
+   * Register a RPC kind and the class to deserialize the rpc request.
+   * 
+   * Called by static initializers of rpcKind Engines
+   * @param rpcKind
+   * @param rpcRequestWrapperClass - this class is used to deserialze the
+   *  the rpc request.
+   *  @param rpcInvoker - use to process the calls on SS.
+   */
+  
+  public static void registerProtocolEngine(RpcKind rpcKind, 
+          Class<? extends Writable> rpcRequestWrapperClass,
+          RpcInvoker rpcInvoker) {
+    RpcKindMapValue  old = 
+        rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
+    if (old != null) {
+      rpcKindMap.put(rpcKind, old);
+      throw new IllegalArgumentException("ReRegistration of rpcKind: " +
+          rpcKind);      
+    }
+    LOG.info("rpcKind=" + rpcKind + 
+        ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + 
+        ", rpcInvoker=" + rpcInvoker);
+  }
+  
+  public Class<? extends Writable> getRpcRequestWrapper(
+      RpcKind rpcKind) {
+    if (rpcRequestClass != null)
+       return rpcRequestClass;
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcRequestWrapperClass; 
+  }
+  
+  public static RpcInvoker  getRpcInvoker(RpcKind rpcKind) {
+    RpcKindMapValue val = rpcKindMap.get(rpcKind);
+    return (val == null) ? null : val.rpcInvoker; 
+  }
+  
 
   public static final Log LOG = LogFactory.getLog(Server.class);
   public static final Log AUDITLOG = 
@@ -181,7 +236,7 @@ public abstract class Server {
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
   private int readThreads;                        // number of read threads
-  private Class<? extends Writable> paramClass;   // class of call parameters
+  private Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc request
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
   private int thresholdIdleConnections;           // the number of idle connections
@@ -1394,9 +1449,27 @@ public abstract class Server {
         throw new IOException("IPC Server does not implement operation" + 
               header.getOperation());
       }
+      // If we know the rpc kind, get its class so that we can deserialize
+      // (Note it would make more sense to have the handler deserialize but 
+      // we continue with this original design.
+      Class<? extends Writable> rpcRequestClass = 
+          getRpcRequestWrapper(header.getkind());
+      if (rpcRequestClass == null) {
+        LOG.warn("Unknown rpc kind "  + header.getkind() + 
+            " from client " + getHostAddress());
+        final Call readParamsFailedCall = 
+            new Call(header.getCallId(), null, this);
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+            IOException.class.getName(),
+            "Unknown rpc kind "  + header.getkind());
+        responder.doRespond(readParamsFailedCall);
+        return;   
+      }
       Writable rpcRequest;
       try { //Read the rpc request
-        rpcRequest = ReflectionUtils.newInstance(paramClass, conf);
+        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
         rpcRequest.readFields(dis);
       } catch (Throwable t) {
         LOG.warn("Unable to read call parameters for client " +
@@ -1488,7 +1561,7 @@ public abstract class Server {
             // Make the call as the user via Subject.doAs, thus associating
             // the call with the Subject
             if (call.connection.user == null) {
-              value = call(call.connection.protocolName, call.rpcRequest, 
+              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                            call.timestamp);
             } else {
               value = 
@@ -1497,7 +1570,7 @@ public abstract class Server {
                      @Override
                      public Writable run() throws Exception {
                        // make the call
-                       return call(call.connection.protocolName, 
+                       return call(call.rpcKind, call.connection.protocolName, 
                                    call.rpcRequest, call.timestamp);
 
                      }
@@ -1550,24 +1623,33 @@ public abstract class Server {
                   Configuration conf)
     throws IOException 
   {
-    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
+        .toString(port), null);
   }
   
-  /** Constructs a server listening on the named port and address.  Parameters passed must
+  /** 
+   * Constructs a server listening on the named port and address.  Parameters passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
    * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
    * from configuration. Otherwise the configuration will be picked up.
+   * 
+   * If rpcRequestClass is null then the rpcRequestClass must have been 
+   * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
+   *  Class, RPC.RpcInvoker)}
+   * This parameter has been retained for compatibility with existing tests
+   * and usage.
    */
   @SuppressWarnings("unchecked")
-  protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
-                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
     this.port = port;
-    this.paramClass = paramClass;
+    this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     if (queueSizePerHandler != -1) {
@@ -1765,17 +1847,17 @@ public abstract class Server {
   
   /** 
    * Called for each call. 
-   * @deprecated Use {@link #call(String, Writable, long)} instead
+   * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,
+   *  Writable, long)} instead
    */
   @Deprecated
   public Writable call(Writable param, long receiveTime) throws IOException {
-    return call(null, param, receiveTime);
+    return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
   }
   
   /** Called for each call. */
-  public abstract Writable call(String protocol,
-                               Writable param, long receiveTime)
-  throws IOException;
+  public abstract Writable call(RpcKind rpcKind, String protocol,
+      Writable param, long receiveTime) throws IOException;
   
   /**
    * Authorize the incoming client connection.
@@ -1925,5 +2007,5 @@ public abstract class Server {
 
     int nBytes = initialRemaining - buf.remaining(); 
     return (nBytes > 0) ? nBytes : ret;
-  }      
+  }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1230378&r1=1230377&r2=1230378&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java Thu Jan 12 02:54:01 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
 import java.lang.reflect.Array;
@@ -27,18 +26,14 @@ import java.lang.reflect.InvocationTarge
 
 import java.net.InetSocketAddress;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.io.Closeable;
-import java.util.Map;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -53,36 +48,9 @@ import org.apache.hadoop.conf.*;
 public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
- 
-  /**
-   * Get all superInterfaces that extend VersionedProtocol
-   * @param childInterfaces
-   * @return the super interfaces that extend VersionedProtocol
-   */
-  private static Class<?>[] getSuperInterfaces(Class<?>[] childInterfaces) {
-    List<Class<?>> allInterfaces = new ArrayList<Class<?>>();
-
-    for (Class<?> childInterface : childInterfaces) {
-      if (VersionedProtocol.class.isAssignableFrom(childInterface)) {
-          allInterfaces.add(childInterface);
-          allInterfaces.addAll(
-              Arrays.asList(
-                  getSuperInterfaces(childInterface.getInterfaces())));
-      } else {
-        LOG.warn("Interface " + childInterface +
-              " ignored because it does not extend VersionedProtocol");
-      }
-    }
-    return (Class<?>[]) allInterfaces.toArray(new Class[allInterfaces.size()]);
-  }
-  
-  /**
-   * Get all interfaces that the given protocol implements or extends
-   * which are assignable from VersionedProtocol.
-   */
-  private static Class<?>[] getProtocolInterfaces(Class<?> protocol) {
-    Class<?>[] interfaces  = protocol.getInterfaces();
-    return getSuperInterfaces(interfaces);
+  static { // Register the rpcRequest deserializer for WritableRpcEngine 
+    org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE,
+        Invocation.class, new Server.WritableRpcInvoker());
   }
 
   
@@ -120,15 +88,7 @@ public class WritableRpcEngine implement
         clientVersion = 0;
         clientMethodsHash = 0;
       } else {
-        try {
-          Field versionField = method.getDeclaringClass().getField("versionID");
-          versionField.setAccessible(true);
-          this.clientVersion = versionField.getLong(method.getDeclaringClass());
-        } catch (NoSuchFieldException ex) {
-          throw new RuntimeException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new RuntimeException(ex);
-        }
+        this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass());
         this.clientMethodsHash = ProtocolSignature.getFingerprint(method
             .getDeclaringClass().getMethods());
       }
@@ -329,140 +289,25 @@ public class WritableRpcEngine implement
 
   /** An RPC Server. */
   public static class Server extends RPC.Server {
-    private boolean verbose;
-    
     /**
-     *  The key in Map
-     */
-    static class ProtoNameVer {
-      final String protocol;
-      final long   version;
-      ProtoNameVer(String protocol, long ver) {
-        this.protocol = protocol;
-        this.version = ver;
-      }
-      @Override
-      public boolean equals(Object o) {
-        if (o == null) 
-          return false;
-        if (this == o) 
-          return true;
-        if (! (o instanceof ProtoNameVer))
-          return false;
-        ProtoNameVer pv = (ProtoNameVer) o;
-        return ((pv.protocol.equals(this.protocol)) && 
-            (pv.version == this.version));     
-      }
-      @Override
-      public int hashCode() {
-        return protocol.hashCode() * 37 + (int) version;    
-      }
-    }
-    
-    /**
-     * The value in map
-     */
-    static class ProtoClassProtoImpl {
-      final Class<?> protocolClass;
-      final Object protocolImpl; 
-      ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
-        this.protocolClass = protocolClass;
-        this.protocolImpl = protocolImpl;
-      }
-    }
-    
-    private Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMap = 
-        new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);
-    
-    // Register  protocol and its impl for rpc calls
-    private void registerProtocolAndImpl(Class<?> protocolClass, 
-        Object protocolImpl) throws IOException {
-      String protocolName = RPC.getProtocolName(protocolClass);
-      VersionedProtocol vp = (VersionedProtocol) protocolImpl;
-      long version;
-      try {
-        version = vp.getProtocolVersion(protocolName, 0);
-      } catch (Exception ex) {
-        LOG.warn("Protocol "  + protocolClass + 
-             " NOT registered as getProtocolVersion throws exception ");
-        return;
-      }
-      protocolImplMap.put(new ProtoNameVer(protocolName, version),
-          new ProtoClassProtoImpl(protocolClass, protocolImpl)); 
-      LOG.info("Protocol Name = " + protocolName +  " version=" + version +
-          " ProtocolImpl=" + protocolImpl.getClass().getName() + 
-          " protocolClass=" + protocolClass.getName());
-    }
-    
-    private static class VerProtocolImpl {
-      final long version;
-      final ProtoClassProtoImpl protocolTarget;
-      VerProtocolImpl(long ver, ProtoClassProtoImpl protocolTarget) {
-        this.version = ver;
-        this.protocolTarget = protocolTarget;
-      }
-    }
-    
-    
-    @SuppressWarnings("unused") // will be useful later.
-    private VerProtocolImpl[] getSupportedProtocolVersions(
-        String protocolName) {
-      VerProtocolImpl[] resultk = new  VerProtocolImpl[protocolImplMap.size()];
-      int i = 0;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv :
-                                        protocolImplMap.entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          resultk[i++] = 
-              new VerProtocolImpl(pv.getKey().version, pv.getValue());
-        }
-      }
-      if (i == 0) {
-        return null;
-      }
-      VerProtocolImpl[] result = new VerProtocolImpl[i];
-      System.arraycopy(resultk, 0, result, 0, i);
-      return result;
-    }
-    
-    private VerProtocolImpl getHighestSupportedProtocol(String protocolName) {    
-      Long highestVersion = 0L;
-      ProtoClassProtoImpl highest = null;
-      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : protocolImplMap
-          .entrySet()) {
-        if (pv.getKey().protocol.equals(protocolName)) {
-          if ((highest == null) || (pv.getKey().version > highestVersion)) {
-            highest = pv.getValue();
-            highestVersion = pv.getKey().version;
-          } 
-        }
-      }
-      if (highest == null) {
-        return null;
-      }
-      return new VerProtocolImpl(highestVersion,  highest);   
-    }
- 
-
-    /** Construct an RPC server.
+     * Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
      * 
-     * @deprecated Use #Server(Class, Object, Configuration, String, int)
-     *    
+     * @deprecated Use #Server(Class, Object, Configuration, String, int)    
      */
     @Deprecated
     public Server(Object instance, Configuration conf, String bindAddress,
-        int port) 
-      throws IOException {
+        int port) throws IOException {
       this(null, instance, conf,  bindAddress, port);
     }
     
     
     /** Construct an RPC server.
-     * @param protocol class
-     * @param instance the instance whose methods will be called
+     * @param protocolClass class
+     * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
      * @param port the port to listen for connections on
@@ -474,16 +319,8 @@ public class WritableRpcEngine implement
           false, null);
     }
     
-    private static String classNameBase(String className) {
-      String[] names = className.split("\\.", -1);
-      if (names == null || names.length == 0) {
-        return className;
-      }
-      return names[names.length-1];
-    }
-    
-    
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolImpl the instance whose methods will be called
      * @param conf the configuration to use
      * @param bindAddress the address to bind on to listen for connection
@@ -505,7 +342,8 @@ public class WritableRpcEngine implement
    
     }
     
-    /** Construct an RPC server.
+    /** 
+     * Construct an RPC server.
      * @param protocolClass - the protocol being registered
      *     can be null for compatibility with old usage (see below for details)
      * @param protocolImpl the protocol impl that will be called
@@ -520,7 +358,7 @@ public class WritableRpcEngine implement
         int numHandlers, int numReaders, int queueSizePerHandler, 
         boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+      super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
           classNameBase(protocolImpl.getClass().getName()), secretManager);
 
@@ -535,7 +373,7 @@ public class WritableRpcEngine implement
          * the protocolImpl is derived from the protocolClass(es) 
          * we register all interfaces extended by the protocolImpl
          */
-        protocols = getProtocolInterfaces(protocolImpl.getClass());
+        protocols = RPC.getProtocolInterfaces(protocolImpl.getClass());
 
       } else {
         if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
@@ -544,132 +382,125 @@ public class WritableRpcEngine implement
               protocolImpl.getClass());
         }
         // register protocol class and its super interfaces
-        registerProtocolAndImpl(protocolClass, protocolImpl);
-        protocols = getProtocolInterfaces(protocolClass);
+        registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
+        protocols = RPC.getProtocolInterfaces(protocolClass);
       }
       for (Class<?> p : protocols) {
         if (!p.equals(VersionedProtocol.class)) {
-          registerProtocolAndImpl(p, protocolImpl);
+          registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl);
         }
       }
 
     }
 
- 
-    @Override
-    public <PROTO, IMPL extends PROTO> Server
-      addProtocol(
-        Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
-      registerProtocolAndImpl(protocolClass, protocolImpl);
-      return this;
+    private static void log(String value) {
+      if (value!= null && value.length() > 55)
+        value = value.substring(0, 55)+"...";
+      LOG.info(value);
     }
     
-    /**
-     * Process a client call
-     * @param protocolName - the protocol name (the class of the client proxy
-     *      used to make calls to the rpc server.
-     * @param param  parameters
-     * @param receivedTime time at which the call receoved (for metrics)
-     * @return the call's return
-     * @throws IOException
-     */
-    public Writable call(String protocolName, Writable param, long receivedTime) 
-    throws IOException {
-      try {
-        Invocation call = (Invocation)param;
-        if (verbose) log("Call: " + call);
-
-        // Verify rpc version
-        if (call.getRpcVersion() != writableRpcVersion) {
-          // Client is using a different version of WritableRpc
-          throw new IOException(
-              "WritableRpc version mismatch, client side version="
-                  + call.getRpcVersion() + ", server side version="
-                  + writableRpcVersion);
-        }
+    static class WritableRpcInvoker implements RpcInvoker {
+
+     @Override
+      public Writable call(org.apache.hadoop.ipc.RPC.Server server,
+          String protocolName, Writable rpcRequest, long receivedTime)
+          throws IOException {
+        try {
+          Invocation call = (Invocation)rpcRequest;
+          if (server.verbose) log("Call: " + call);
 
-        long clientVersion = call.getProtocolVersion();
-        final String protoName;
-        ProtoClassProtoImpl protocolImpl;
-        if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
-          // VersionProtocol methods are often used by client to figure out
-          // which version of protocol to use.
-          //
-          // Versioned protocol methods should go the protocolName protocol
-          // rather than the declaring class of the method since the
-          // the declaring class is VersionedProtocol which is not 
-          // registered directly.
-          // Send the call to the highest  protocol version
-          protocolImpl = 
-              getHighestSupportedProtocol(protocolName).protocolTarget;
-        } else {
-          protoName = call.declaringClassProtocolName;
-
-          // Find the right impl for the protocol based on client version.
-          ProtoNameVer pv = 
-              new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
-          protocolImpl = protocolImplMap.get(pv);
-          if (protocolImpl == null) { // no match for Protocol AND Version
-             VerProtocolImpl highest = 
-                 getHighestSupportedProtocol(protoName);
+          // Verify rpc version
+          if (call.getRpcVersion() != writableRpcVersion) {
+            // Client is using a different version of WritableRpc
+            throw new IOException(
+                "WritableRpc version mismatch, client side version="
+                    + call.getRpcVersion() + ", server side version="
+                    + writableRpcVersion);
+          }
+
+          long clientVersion = call.getProtocolVersion();
+          final String protoName;
+          ProtoClassProtoImpl protocolImpl;
+          if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) {
+            // VersionProtocol methods are often used by client to figure out
+            // which version of protocol to use.
+            //
+            // Versioned protocol methods should go the protocolName protocol
+            // rather than the declaring class of the method since the
+            // the declaring class is VersionedProtocol which is not 
+            // registered directly.
+            // Send the call to the highest  protocol version
+            VerProtocolImpl highest = server.getHighestSupportedProtocol(
+                RpcKind.RPC_WRITABLE, protocolName);
             if (highest == null) {
-              throw new IOException("Unknown protocol: " + protoName);
-            } else { // protocol supported but not the version that client wants
-              throw new RPC.VersionMismatch(protoName, clientVersion,
-                highest.version);
+              throw new IOException("Unknown protocol: " + protocolName);
+            }
+            protocolImpl = highest.protocolTarget;
+          } else {
+            protoName = call.declaringClassProtocolName;
+
+            // Find the right impl for the protocol based on client version.
+            ProtoNameVer pv = 
+                new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
+            protocolImpl = 
+                server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv);
+            if (protocolImpl == null) { // no match for Protocol AND Version
+               VerProtocolImpl highest = 
+                   server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, 
+                       protoName);
+              if (highest == null) {
+                throw new IOException("Unknown protocol: " + protoName);
+              } else { // protocol supported but not the version that client wants
+                throw new RPC.VersionMismatch(protoName, clientVersion,
+                  highest.version);
+              }
             }
           }
-        }
-        
+          
 
-        // Invoke the protocol method
+          // Invoke the protocol method
 
-        long startTime = System.currentTimeMillis();
-        Method method = 
-            protocolImpl.protocolClass.getMethod(call.getMethodName(),
-            call.getParameterClasses());
-        method.setAccessible(true);
-        rpcDetailedMetrics.init(protocolImpl.protocolClass);
-        Object value = 
-            method.invoke(protocolImpl.protocolImpl, call.getParameters());
-        int processingTime = (int) (System.currentTimeMillis() - startTime);
-        int qTime = (int) (startTime-receivedTime);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Served: " + call.getMethodName() +
-                    " queueTime= " + qTime +
-                    " procesingTime= " + processingTime);
-        }
-        rpcMetrics.addRpcQueueTime(qTime);
-        rpcMetrics.addRpcProcessingTime(processingTime);
-        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
-                                             processingTime);
-        if (verbose) log("Return: "+value);
-
-        return new ObjectWritable(method.getReturnType(), value);
-
-      } catch (InvocationTargetException e) {
-        Throwable target = e.getTargetException();
-        if (target instanceof IOException) {
-          throw (IOException)target;
-        } else {
-          IOException ioe = new IOException(target.toString());
-          ioe.setStackTrace(target.getStackTrace());
+          long startTime = System.currentTimeMillis();
+          Method method = 
+              protocolImpl.protocolClass.getMethod(call.getMethodName(),
+              call.getParameterClasses());
+          method.setAccessible(true);
+          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
+          Object value = 
+              method.invoke(protocolImpl.protocolImpl, call.getParameters());
+          int processingTime = (int) (System.currentTimeMillis() - startTime);
+          int qTime = (int) (startTime-receivedTime);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Served: " + call.getMethodName() +
+                      " queueTime= " + qTime +
+                      " procesingTime= " + processingTime);
+          }
+          server.rpcMetrics.addRpcQueueTime(qTime);
+          server.rpcMetrics.addRpcProcessingTime(processingTime);
+          server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
+                                               processingTime);
+          if (server.verbose) log("Return: "+value);
+
+          return new ObjectWritable(method.getReturnType(), value);
+
+        } catch (InvocationTargetException e) {
+          Throwable target = e.getTargetException();
+          if (target instanceof IOException) {
+            throw (IOException)target;
+          } else {
+            IOException ioe = new IOException(target.toString());
+            ioe.setStackTrace(target.getStackTrace());
+            throw ioe;
+          }
+        } catch (Throwable e) {
+          if (!(e instanceof IOException)) {
+            LOG.error("Unexpected throwable object ", e);
+          }
+          IOException ioe = new IOException(e.toString());
+          ioe.setStackTrace(e.getStackTrace());
           throw ioe;
         }
-      } catch (Throwable e) {
-        if (!(e instanceof IOException)) {
-          LOG.error("Unexpected throwable object ", e);
-        }
-        IOException ioe = new IOException(e.toString());
-        ioe.setStackTrace(e.getStackTrace());
-        throw ioe;
       }
     }
   }
-
-  private static void log(String value) {
-    if (value!= null && value.length() > 55)
-      value = value.substring(0, 55)+"...";
-    LOG.info(value);
-  }
 }